Source code for yubiotp.otp

"""
Implementation of the Yubico OTP algorithm. This can generate and parse OTP
structures.

>>> from binascii import unhexlify
>>> key = b'0123456789abcdef'
>>> otp = OTP(unhexlify(b'0123456789ab'), 5, 0x0153f8, 0, 0x1234)
>>> _ = repr(otp)  # coverage
>>> _ = str(otp)  # coverage
>>> token = encode_otp(otp, key, b'cclngiuv')
>>> token == b'cclngiuvttkhthcilurtkerbjnnkljfkjccklkhl'
True
>>> public_id, otp2 = decode_otp(token, key)
>>> public_id == b'cclngiuv'
True
>>> otp2 == otp
True
"""
from binascii import hexlify
from datetime import datetime
from random import randrange
from struct import pack, unpack

from Crypto.Cipher import AES

from .crc import crc16, verify_crc16
from .modhex import is_modhex, modhex, unmodhex

__all__ = ['decode_otp', 'encode_otp', 'OTP', 'YubiKey', 'CRCError']


[docs] class CRCError(ValueError): """ Raised when a decrypted token has an invalid checksum. """ pass
[docs] def decode_otp(token, key): """ Decodes a modhex-encoded Yubico OTP token and returns the public ID and the unpacked :class:`OTP` object. :param bytes token: A modhex-encoded buffer, as generated by a YubiKey device. Decoded, this should consist of 0-16 bytes of public ID followed by 16 bytes of encrypted OTP data. :param bytes key: A 16-byte AES key as a binary string. :returns: The public ID in its modhex-encoded form and the OTP structure. :rtype: (bytes, :class:`OTP`) :raises: ``ValueError`` if the string can not be decoded. :raises: :exc:`CRCError` if the checksum on the decrypted data is incorrect. """ if len(key) != 16: raise ValueError('Key must be exactly 16 bytes') public_id, token = token[:-32], token[-32:] buf = unmodhex(token) buf = AES.new(key, AES.MODE_ECB).decrypt(buf) otp = OTP.unpack(buf) return (public_id, otp)
[docs] def encode_otp(otp, key, public_id=b''): """ Encodes an :class:`OTP` structure, encrypts it with the given key and returns the modhex-encoded token. :param otp: The OTP structure. :type otp: :class:`OTP` :param bytes key: A 16-byte AES key as a binary string. :param bytes public_id: An optional public id, modhex-encoded. This can be at most 32 bytes. :raises: ValueError if any parameters are out of range. """ if len(key) != 16: raise ValueError('Key must be exactly 16 bytes') if not is_modhex(public_id): raise ValueError('public_id must be a valid modhex string') if len(public_id) > 32: raise ValueError('public_id may be no longer than 32 modhex characters') buf = otp.pack() buf = AES.new(key, AES.MODE_ECB).encrypt(buf) token = modhex(buf) return public_id + token
[docs] class OTP(object): """ A single YubiKey OTP. This is typically instantiated by parsing an encoded OTP. :param bytes uid: The private ID as a 6-byte binary string. :param int session: The non-volatile usage counter. :param int timestamp: An integer in [0..2^24]. :param int counter: The volatile usage counter. :param int rand: An arbitrary number in [0..2^16]. """ def __init__(self, uid, session, timestamp, counter, rand): self.uid = uid self.session = session self.timestamp = timestamp self.counter = counter self.rand = rand def __repr__(self): return 'OTP({self.uid!r}, {self.session!r}, {self.timestamp!r}, {self.counter!r}, {self.rand!r})'.format( self=self ) def __str__(self): return 'OTP: {0} {1}/{2} (0x{3:x}/0x{4:x})'.format( hexlify(self.uid), self.session, self.counter, self.timestamp, self.rand ) def __eq__(self, other): if self.__class__ is not other.__class__: return False self_props = (self.uid, self.session, self.timestamp, self.counter, self.rand) other_props = ( other.uid, other.session, other.timestamp, other.counter, other.rand, ) return self_props == other_props
[docs] def pack(self): """ Returns the OTP packed into a binary string, ready to be encrypted and encoded. """ fields = ( self.uid, self.session, self.timestamp & 0xFF, (self.timestamp >> 8) & 0xFFFF, self.counter, self.rand, ) buf = pack('<6s H BH B H', *fields) crc = ~crc16(buf) & 0xFFFF buf += pack('<H', crc) return buf
[docs] @classmethod def unpack(cls, buf): """ Parse a packed OTP. This is the complement to :meth:`pack` so the buffer should be a decoded, decrypted OTP buffer. :param bytes buf: A packed OTP structure. :raises: :exc:`CRCError` if the buffer does not pass crc validation. """ if not verify_crc16(buf): raise CRCError('OTP checksum is invalid') uid, session, t1, t2, counter, rand, crc = unpack('<6s H BH B H H', buf) timestamp = (t2 << 8) | t1 return cls(uid, session, timestamp, counter, rand)
[docs] class YubiKey(object): """ A simulated YubiKey device. This can be used to generate a sequence of Yubico OTP tokens. :param bytes uid: The private ID as a 6-byte binary string. :param int session: The non-volatile usage counter. It is the caller's responsibility to persist this. Note that this may increment if the volatile counter wraps, so you should only increment and persist this after you have finished generating tokens. :param int counter: The volatile session counter. This defaults to 0 at init time, but the caller can override this. """ def __init__(self, uid, session, counter=0): self.uid = uid self.session = min(session, 0x7FFF) self.counter = min(counter, 0xFF) self._init_timestamp()
[docs] def generate(self): """ Return a new OTP object, as if the user had pressed the YubiKey button. :rtype: :class:`OTP` """ otp = OTP( self.uid, self.session, self._timestamp(), self.counter, randrange(0xFFFF) ) self._increment_counter() return otp
def _init_timestamp(self): self._timestamp_base = randrange(0x00FFFF) self._timestamp_start = datetime.now() def _timestamp(self): """ Returns the current timestamp value, based on the number of seconds since the object was created. """ delta = datetime.now() - self._timestamp_start delta = delta.days * 86400 + delta.seconds return (self._timestamp_base + (delta * 8)) % 0xFFFFFF def _increment_counter(self): if self.counter >= 0xFF: self._increment_session() self.counter = 0 else: self.counter += 1 def _increment_session(self): self.session = min(self.session + 1, 0x7FFF)