#!/usr/bin/env python # CCS Computer Science # Common # # Douglas Thrift # # $Id$ import getpass from M2Crypto import SSL import os import socket import struct HANDSHAKE = ('CCSCSLAB', (494, 119)) HOSTS = map(lambda host: host + '.ccs.ucsb.edu', ('zweihander', 'katana')) CHANGE = 1 RESET = 2 ADD = 3 CHFN = 4 CHSH = 5 BOOT = 6 PORT = 2748 SSL_ROOT = '/ccs/ssl' class Error(Exception): def __init__(self, error): self.error = error def __str__(self): return self.error def __unicode__(self): return str(self) class HandshakeError(Error): def __init__(self): self.error = 'Invalid handshake.' class PasswordError(Error): pass _HOST = socket.gethostname().split('.', 1)[0] HOST = socket.getfqdn(_HOST) SYSTEMS = ( 'freebsd', 'linux', 'darwin', 'solaris', 'netbsd', 'debian', 'openbsd', 'dragonfly', ) class Shells(object): def __init__(self, *args): if args: assert len(SYSTEMS) == len(args) self.shells = dict(zip(SYSTEMS, args)) else: self.shells = dict((system, None) for system in SYSTEMS) def __cmp__(self, other): return cmp(self.shells, other.shells) def __len__(self): return len(self.shells) def __getitem__(self, system): return self.shells[system] def __setitem__(self, system, shell): self.shells[system] = shell def __iter__(self): for system in SYSTEMS: yield self.shells[system] SHELLS = ( ('sh', Shells( '/bin/sh', # FreeBSD '/bin/sh', # Linux '/bin/sh', # Darwin '/usr/bin/sh', # Solaris '/bin/sh', # NetBSD '/bin/sh', # Debian '/bin/sh', # OpenBSD '/bin/sh', # DragonFly )), ('csh', Shells( '/bin/csh', # FreeBSD '/bin/csh', # Linux '/bin/csh', # Darwin '/usr/bin/csh', # Solaris '/bin/csh', # NetBSD '/bin/csh', # Debian '/bin/csh', # OpenBSD '/bin/csh', # DragonFly )), ('tcsh', Shells( '/bin/tcsh', # FreeBSD '/bin/tcsh', # Linux '/bin/tcsh', # Darwin '/usr/bin/tcsh', # Solaris '/usr/pkg/bin/tcsh', # NetBSD '/usr/bin/tcsh', # Debian '/usr/local/bin/tcsh', # OpenBSD '/bin/tcsh', # DragonFly )), ('bash', Shells( '/usr/local/bin/bash', # FreeBSD '/bin/bash', # Linux '/bin/bash', # Darwin '/usr/local/bin/bash', # Solaris '/usr/pkg/bin/bash', # NetBSD '/bin/bash', # Debian '/usr/local/bin/bash', # OpenBSD '/usr/pkg/bin/bash', # DragonFly )), ('ksh', Shells( '/usr/local/bin/ksh', # FreeBSD '/bin/ksh', # Linux '/bin/ksh', # Darwin '/usr/bin/ksh', # Solaris '/bin/ksh', # NetBSD '/usr/bin/ksh', # Debian '/bin/ksh', # OpenBSD '/usr/pkg/bin/ksh', # DragonFly )), ('zsh', Shells( '/usr/local/bin/zsh', # FreeBSD '/bin/zsh', # Linux '/bin/zsh', # Darwin '/usr/bin/zsh', # Solaris '/usr/pkg/bin/zsh', # NetBSD '/usr/bin/zsh', # Debian '/usr/local/bin/zsh', # OpenBSD '/usr/pkg/bin/zsh', # DragonFly )), ('mksh', Shells( '/usr/local/bin/mksh', # FreeBSD '/bin/mksh', # Linux '/sw/bin/mksh', # Darwin '/usr/local/bin/mksh', # Solaris '/usr/pkg/bin/mksh', # NetBSD '/bin/mksh', # Debian '/usr/local/bin/mksh', # OpenBSD '/usr/pkg/bin/mksh', # DragonFly )), ('custom', Shells()), ) class _Encode(object): def __init__(self, function): self.function = function def __call__(self, *args): def encode(value): if isinstance(value, basestring): return value.encode('ascii') elif isinstance(value, Shells): return Shells(*map(encode, value)) else: return value return self.function(*map(encode, args)) def _context(): context = SSL.Context() context.set_verify(SSL.verify_peer | SSL.verify_fail_if_no_peer_cert, 2) context.load_cert(certfile = os.path.join(SSL_ROOT, _HOST + '.crt'), keyfile = os.path.join(SSL_ROOT, _HOST + '.key')) cacert = os.path.join(SSL_ROOT, 'ccscert.pem') if not os.path.exists(cacert): cacert = os.path.join(SSL_ROOT, 'ccscert.crt') context.load_verify_locations(cafile = cacert) return context def _command(command, host): connection = SSL.Connection(_context()) connection.connect((host, PORT)) input, output = connection.makefile('rb', -1), connection.makefile('wb', 0) output.write(HANDSHAKE[0]) output.flush() if struct.unpack('