82c6a243 |
#!/usr/bin/env python3
|
0c7825ae |
# Copyright © 2020-2022 Xavier G. <xavier.yamltab@kindwolf.org>
|
82c6a243 |
# This work is free. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See the COPYING file for more details.
from io import BufferedReader, BytesIO
import os
|
9cd5654c |
import re
|
82c6a243 |
import sys
import json
import yaml
import struct
import argparse
|
3994a7e2 |
from datetime import datetime, timezone
|
acada7f7 |
from binascii import hexlify, unhexlify
|
82c6a243 |
# Documents used as reference to implement the keytab format:
# [1] https://web.mit.edu/kerberos/krb5-1.12/doc/formats/keytab_file_format.html
# [2] https://github.com/krb5/krb5/blob/master/src/lib/krb5/keytab/kt_file.c#L892
# [3] https://github.com/krb5/krb5/blob/master/src/include/krb5/krb5.hin#L230
DATA_LAYOUT_RAW = 0
DATA_LAYOUT_FULL = 1
DATA_LAYOUT_SIMPLE = 2
DATA_LAYOUTS = {'raw': DATA_LAYOUT_RAW, 'full': DATA_LAYOUT_FULL, 'simple': DATA_LAYOUT_SIMPLE}
|
b0337068 |
DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
|
82c6a243 |
KEYTAB_FIRST_BYTE = 0x05
# Default prefix for struct's format strings, defining big-endian byte order:
BIG_ENDIAN='>'
|
113604e7 |
LITTLE_ENDIAN='<'
NATIVE_ENDIANNESS='='
|
82c6a243 |
DEFAULT_PREFIX=BIG_ENDIAN
DEFAULT_ENCODING='ascii'
VERBOSITY=1
CALCSIZE={}
|
3994a7e2 |
now = None
|
82c6a243 |
# The following table is based on [3]:
NAME_TYPES = {
'KRB5_NT_UNKNOWN': 0,
'KRB5_NT_PRINCIPAL': 1,
'KRB5_NT_SRV_INST': 2,
'KRB5_NT_SRV_HST': 3,
'KRB5_NT_SRV_XHST': 4,
'KRB5_NT_UID': 5,
'KRB5_NT_X500_PRINCIPAL': 6,
'KRB5_NT_SMTP_NAME': 7,
'KRB5_NT_ENTERPRISE_PRINCIPAL': 10,
'KRB5_NT_WELLKNOWN': 11,
'KRB5_NT_MS_PRINCIPAL': -128,
'KRB5_NT_MS_PRINCIPAL_AND_ID': -129,
'KRB5_NT_ENT_PRINCIPAL_AND_ID': -130,
}
ENC_TYPES = {
'NULL': 0,
'DES_CBC_CRC': 1,
'DES_CBC_MD4': 2,
'DES_CBC_MD5': 3,
'DES_CBC_RAW': 4,
'DES3_CBC_SHA': 5,
'DES3_CBC_RAW': 6,
'DES_HMAC_SHA1': 8,
'DSA_SHA1_CMS': 9,
'MD5_RSA_CMS': 10,
'SHA1_RSA_CMS': 11,
'RC2_CBC_ENV': 12,
'RSA_ENV': 13,
'RSA_ES_OAEP_ENV': 14,
'DES3_CBC_ENV': 15,
'DES3_CBC_SHA1': 16,
'AES128_CTS_HMAC_SHA1_96': 17,
'AES256_CTS_HMAC_SHA1_96': 18,
'AES128_CTS_HMAC_SHA256_128': 19,
'AES256_CTS_HMAC_SHA384_192': 20,
'ARCFOUR_HMAC': 23,
'ARCFOUR_HMAC_EXP': 24,
'CAMELLIA128_CTS_CMAC': 25,
'CAMELLIA256_CTS_CMAC': 26,
'UNKNOWN': 511,
}
class KeytabParsingError(Exception):
MESSAGE = 'Parsing eror: expected {size} bytes to unpack {format} but read {length} bytes instead: {data}'
def __init__(self, data, size, frmt):
self.data = data
self.size = size
self.format = frmt
def __str__(self):
return __class__.MESSAGE.format(**self.__dict__, length=len(self.data))
|
acada7f7 |
class KeytabComposingError(Exception):
pass
|
82c6a243 |
def lookup(lookup_value, dictionary, default):
for name, value in dictionary.items():
if value == lookup_value:
return name
return default
def int_to_name_type(lookup_value):
return lookup(lookup_value, NAME_TYPES, 'KRB5_NT_UNKNOWN')
def int_to_enc_type(lookup_value):
return lookup(lookup_value, ENC_TYPES, 'ENCTYPE_UNKNOWN')
def principal_to_spn(principal):
|
113604e7 |
if principal.get('name_type_raw') != NAME_TYPES['KRB5_NT_PRINCIPAL']:
|
82c6a243 |
return None
components = principal['components']
count = len(components)
if count < 1 or count > 3:
return None
for component in components:
if not component:
return None
spn = components[0]
if count >= 2:
spn += '/' + components[1]
if count == 3:
spn += ':' + components[2]
spn += '@' + principal['realm']
return spn
def verbose(level, msg, *args, **kwargs):
if level <= VERBOSITY:
message = msg.format(*args, **kwargs)
sys.stderr.write(message + '\n')
def unpack(buf, prefix, format):
"""
Wrapper around read(), struct.unpack() and struct.calcsize().
"""
actual_format = prefix + format
size = CALCSIZE.get(actual_format)
if size is None:
size = CALCSIZE[actual_format] = struct.calcsize(actual_format)
data = buf.read(size)
if len(data) < size:
raise KeytabParsingError(data, size, actual_format)
results = struct.unpack(actual_format, data)
return results[0] if len(results) == 1 else results
def parse_data(buf, prefix=DEFAULT_PREFIX):
length = unpack(buf, prefix, 'H')
return buf.read(length)
def parse_str(buf, prefix=DEFAULT_PREFIX, encoding=DEFAULT_ENCODING):
return parse_data(buf, prefix).decode(encoding)
|
113604e7 |
def parse_principal(buf, prefix=DEFAULT_PREFIX, version=2):
|
82c6a243 |
principal = {}
# [1] states "count of components (32 bits)" but [2] says int16:
component_count = unpack(buf, prefix, 'H')
|
113604e7 |
# [1] states "[includes realm in version 1]"
if version == 1:
component_count -= 1
|
82c6a243 |
principal['realm'] = parse_str(buf, prefix)
components = []
for i in range(component_count):
components.append(parse_str(buf, prefix))
principal['components'] = components
|
113604e7 |
# [1] states "[omitted in version 1]"
if version != 1:
# [3] states int32:
principal['name_type_raw'] = unpack(buf, prefix, 'i')
|
82c6a243 |
return principal
|
113604e7 |
def parse_entry(buf, prefix=DEFAULT_PREFIX, version=2):
|
82c6a243 |
entry = {}
|
113604e7 |
entry['principal'] = parse_principal(buf, prefix, version)
|
82c6a243 |
entry['timestamp'], entry['kvno'], entry['enctype_raw'], entry['key_length'] = unpack(buf, prefix, 'IBHH')
entry['key'] = buf.read(entry['key_length'])
return entry
|
113604e7 |
def parse_record(buf, prefix=DEFAULT_PREFIX, version=2):
|
82c6a243 |
record = {'type': 'record'}
|
113604e7 |
record['entry'] = parse_entry(buf, prefix, version)
|
82c6a243 |
record['tail'] = buf.read()
return record
def parse_keytab(buf, args):
second_byte = buf.read(2)[1]
verbose(2, 'keytab v{}', second_byte)
if second_byte == 1:
|
113604e7 |
# Version 1 uses native byte order:
prefix = NATIVE_ENDIANNESS
if args.v1_endianness == 'big':
prefix = BIG_ENDIAN
elif args.v1_endianness == 'little':
prefix = LITTLE_ENDIAN
|
82c6a243 |
elif second_byte == 2:
# Version 2 always uses big-endian byte order:
prefix = BIG_ENDIAN
else:
verbose(1, 'Unknown keytab version: v{}', second_byte)
sys.exit(1)
keytab = {
'version': second_byte,
'records': [],
}
while True:
try:
record_length = unpack(buf, prefix, 'i')
except KeytabParsingError as kpe:
if len(kpe.data):
verbose(1, 'Premature end of file? Got {} as record length.', kpe.data)
break
if not record_length:
break
verbose(3, 'Record #{} of length {}', len(keytab['records']) + 1, record_length)
record = buf.read(abs(record_length))
if record_length > 0:
|
113604e7 |
record = parse_record(BufferedReader(BytesIO(record)), prefix, second_byte)
|
82c6a243 |
else:
record = {'type': 'hole', 'data': record}
record['length'] = record_length
keytab['records'].append(record)
return keytab
def enrich_keytab(keytab):
"""
Enrich records with extra information suitable for human readers.
"""
|
bac679a4 |
for record in keytab['records']:
|
82c6a243 |
if 'entry' not in record:
continue
entry = record['entry']
|
b0337068 |
entry['date'] = datetime.fromtimestamp(entry['timestamp'], tz=timezone.utc).strftime(DATE_TIME_FORMAT)
|
82c6a243 |
if 'name_type_raw' in entry['principal']:
entry['principal']['name_type'] = int_to_name_type(entry['principal']['name_type_raw'])
spn = principal_to_spn(entry['principal'])
if spn:
entry['spn'] = spn
entry['enctype'] = int_to_enc_type(entry['enctype_raw'])
if 'tail' in record:
# [1] states: Some implementations of Kerberos recognize a 32-bit key version at the end of
# an entry, if the record length is at least 4 bytes longer than the entry and the value of
# those 32 bits is not 0. If present, this key version supersedes the 8-bit key version.
if len(record['tail']) >= 4:
tail_kvno = struct.unpack('>I', record['tail'][0:4])[0]
|
bac679a4 |
if tail_kvno or not entry['kvno']:
entry['actual_kvno'] = entry['tail_kvno'] = tail_kvno
|
82c6a243 |
if 'actual_kvno' not in entry:
entry['actual_kvno'] = entry['kvno']
return keytab
def simplify_keytab(keytab):
"""
Simplify the keytab to make it suitable for edition.
"""
|
bac679a4 |
simplified = {'version': keytab['version'], 'entries': []}
|
82c6a243 |
for record in keytab['records']:
if 'entry' not in record:
continue
entry = record['entry']
simple_entry = {}
if 'spn' in entry:
simple_entry['spn'] = entry['spn']
simple_entry['principal'] = {}
for key in ('name_type', 'components', 'realm'):
if key in entry['principal']:
simple_entry['principal'][key] = entry['principal'][key]
simple_entry['kvno'] = entry.get('actual_kvno', entry['kvno'])
|
bac679a4 |
if 'tail_kvno' in entry:
simple_entry['kvno_in_tail'] = True
|
38dfc8c1 |
if 'tail' in record:
start = 4 if 'tail_kvno' in entry else 0
extra_tail = record['tail'][start:]
if extra_tail:
simple_entry['extra_tail'] = extra_tail
|
82c6a243 |
for key in ('date', 'enctype', 'key'):
if key in entry:
simple_entry[key] = entry[key]
simplified['entries'].append(simple_entry)
return simplified
|
3493271d |
def to_bytes(value):
if type(value) is bytes:
return value
return unhexlify(value)
|
82c6a243 |
def prepare_serialization(obj):
"""
Prepare keytab for serialization.
"""
if type(obj) is dict:
for key, value in obj.items():
obj[key] = prepare_serialization(value)
elif type(obj) is list:
for index, value in enumerate(obj):
obj[index] = prepare_serialization(value)
elif type(obj) is bytes:
obj = hexlify(obj).decode(DEFAULT_ENCODING)
return obj
def keytab_data(buf, args):
keytab = parse_keytab(buf, args)
layout = DATA_LAYOUTS.get(args.data_layout, DATA_LAYOUT_FULL)
if layout >= DATA_LAYOUT_FULL:
keytab = enrich_keytab(keytab)
if layout >= DATA_LAYOUT_SIMPLE:
keytab = simplify_keytab(keytab)
return keytab
def keytab_to_yaml(buf, args):
keytab = keytab_data(buf, args)
|
26c58d1c |
output_data(keytab, args, 0)
def output_data(data, args, exit=None):
serialized_data = prepare_serialization(data)
|
82c6a243 |
if args.output_format == 'yaml':
|
49b9c542 |
yaml_width = 160
try:
yaml.dump(serialized_data, sys.stdout, width=yaml_width, sort_keys=False)
except TypeError as error:
# Handle lack of support for sort_keys:
if 'sort_keys' in str(error):
yaml.dump(serialized_data, sys.stdout, width=yaml_width)
else:
raise
|
82c6a243 |
else:
|
26c58d1c |
json.dump(serialized_data, sys.stdout, indent=4)
if exit is not None:
sys.exit(exit)
|
82c6a243 |
|
6cca1603 |
def len_data(data):
return 2 + len(data)
def len_str(string, encoding=DEFAULT_ENCODING):
return len_data(string.encode(encoding))
|
acada7f7 |
def pack_data(data):
return struct.pack('>H', len(data)) + data
def pack_str(string, encoding=DEFAULT_ENCODING):
return pack_data(string.encode(encoding))
def raw_principal_to_binary(principal, index):
for key in ('realm', 'components'):
if key not in principal:
raise KeytabComposingError('Mandatory key "%s" not found in principal #%d' % (key, index))
name_type_raw = principal.get('name_type_raw', NAME_TYPES['KRB5_NT_PRINCIPAL'])
try:
_ = int(name_type_raw)
except:
raise KeytabComposingError('invalid name_type_raw value in principal #%d' % index)
data = struct.pack('>H', len(principal['components']))
data += pack_str(principal['realm'])
for component in principal['components']:
data += pack_str(component)
data += struct.pack('>i', name_type_raw)
return data
def raw_entry_to_binary(entry, index):
for key in ('principal', 'timestamp', 'kvno', 'enctype_raw', 'key'):
if key not in entry:
raise KeytabComposingError('Mandatory key "%s" not found in entry #%d' % (key, index))
for key in ('timestamp', 'kvno', 'enctype_raw'):
try:
assert(int(entry[key]) >= 0)
except:
raise KeytabComposingError('invalid %s data in entry #%d' % (key, index))
data = raw_principal_to_binary(entry['principal'], index)
try:
|
3493271d |
key_data = to_bytes(entry['key'])
|
acada7f7 |
except:
raise KeytabComposingError('invalid key data in entry #%d' % index)
data += struct.pack('>IBHH', entry['timestamp'], entry['kvno'], entry['enctype_raw'], len(key_data))
data += key_data
return data
def raw_record_to_binary(record, index):
data = b''
if 'entry' not in record:
raise KeytabComposingError('missing entry in record #%d' % index)
data += raw_entry_to_binary(record['entry'], index)
if 'tail' in record:
try:
|
3493271d |
data += to_bytes(record['tail'])
|
acada7f7 |
except:
raise KeytabComposingError('invalid tail data in record #%d' % index)
return struct.pack('>i', len(data)) + data
def raw_hole_to_binary(hole, index):
data = b''
if 'length' not in record:
raise KeytabComposingError('missing length in record #%d' % index)
try:
length = abs(int(record['length']))
if not length:
raise KeytabComposingError('illegal zero-length hole in record #%d' % index)
data += struct.pack('>i', -length)
except:
raise KeytabComposingError('invalid length in record #%d' % index)
if 'data' in record:
try:
|
3493271d |
hole_data = to_bytes(record['data'])
|
acada7f7 |
except:
raise KeytabComposingError('invalid data in record #%d' % index)
if len(hole_data) != length:
raise KeytabComposingError('length and data do not match in record #%d' % index)
data += hole_data
else:
data += b'\x00' * length
return data
def raw_keytab_to_binary(indata):
data = bytes([KEYTAB_FIRST_BYTE, 2])
for index, record in enumerate(indata.get('records', [])):
record_type = record.get('type', 'record')
if record_type == 'hole':
data += raw_hole_to_binary(record, index)
elif record_type == 'record':
data += raw_record_to_binary(record, index)
else:
raise KeytabComposingError('Unknown record type in record #%d' % index)
return data
|
9cd5654c |
def spn_to_principal(spn, name_type='KRB5_NT_PRINCIPAL'):
principal = {'name_type': name_type}
principal['name_type_raw'] = NAME_TYPES[name_type]
rem = re.match(r'((?P<comp1>[^/]+)/)?(?P<comp2>[^:]+)(?::(?P<comp3>[^@]+))?@(?P<realm>.+)', spn)
if not rem:
raise KeytabComposingError('Cannot parse SPN %s into principal' % spn)
principal['realm'] = rem.group('realm')
principal['components'] = []
|
2935463d |
for name, component in rem.groupdict().items():
|
9cd5654c |
if name.startswith('comp') and component is not None:
principal['components'].append(component)
return principal
|
6cca1603 |
def principal_length(principal):
# component count:
length = 2
length += len_str(principal.get('realm', ''))
for component in principal.get('components', []):
length += len_str(component)
if principal.get('name_type_raw', principal.get('name_type')):
length += 4
return length
|
9cd5654c |
def simple_principal_to_full(inentry, index, entry):
if 'principal' in inentry:
principal = entry['principal'] = inentry['principal']
if 'name_type_raw' not in principal:
try:
principal['name_type_raw'] = NAME_TYPES[principal['name_type']]
except KeyError:
message = 'Invalid or unknown name_type specified in entry #%d'
message += '; use name_type_raw to enforce an arbitrary value'
raise KeytabParsingError(message % index)
|
4cc454b8 |
spn = principal_to_spn(principal)
if spn:
entry['spn'] = spn
|
9cd5654c |
elif 'spn' in inentry:
|
f2a95e91 |
entry['spn'] = inentry['spn']
|
9cd5654c |
entry['principal'] = spn_to_principal(inentry['spn'])
|
6cca1603 |
return principal_length(entry['principal'])
|
9cd5654c |
|
bac679a4 |
def simple_kvno_to_full(inentry, index, entry, record):
|
9cd5654c |
if 'kvno' in inentry:
entry['actual_kvno'] = inentry['kvno']
kvno_too_big = inentry['kvno'] > 255
entry['kvno'] = 0 if kvno_too_big else inentry['kvno']
|
bac679a4 |
if inentry.get('kvno_in_tail', False):
|
9cd5654c |
entry['tail_kvno'] = inentry['kvno']
|
3493271d |
record['tail'] = struct.pack('>I', inentry['kvno'])
|
9cd5654c |
elif kvno_too_big:
message = 'Cannot store kvno > 255 without kvno_in_tail in entry #%d'
raise KeytabComposingError(message % index)
|
38dfc8c1 |
record['tail'] += to_bytes(inentry.get('extra_tail', b''))
|
6cca1603 |
return 1
|
9cd5654c |
def simple_timestamp_to_full(inentry, index, entry):
if 'timestamp' in inentry:
entry['timestamp'] = inentry['timestamp']
elif 'date' in inentry:
|
f2a95e91 |
entry['date'] = inentry['date']
|
9cd5654c |
if inentry['date'] == 'now':
|
3994a7e2 |
global now
if now is None:
now = int(datetime.now(timezone.utc).timestamp())
|
9cd5654c |
entry['timestamp'] = now
else:
try:
parsed_date = datetime.strptime(inentry['date'], DATE_TIME_FORMAT)
entry['timestamp'] = int(parsed_date.timestamp())
except:
raise KeytabParsingError('Invalid date specified in entry #%d' % index)
|
6cca1603 |
return 4
|
9cd5654c |
def simple_enctype_to_full(inentry, index, entry):
if 'enctype_raw' in inentry:
entry['enctype_raw'] = inentry['enctype_raw']
elif 'enctype' in inentry:
|
f2a95e91 |
entry['enctype'] = inentry['enctype']
|
9cd5654c |
try:
entry['enctype_raw'] = ENC_TYPES[inentry['enctype']]
except KeyError:
message = 'Invalid or unknown enctype specified in entry #%d'
message += '; use enctype_raw to enforce an arbitrary value'
raise KeytabParsingError(message % index)
|
6cca1603 |
return 2
|
9cd5654c |
def simple_keytab_to_full(indata):
data = {
'version': indata.get('version', 2),
'records': [],
}
for index, inentry in enumerate(indata.get('entries', [])):
entry = {}
|
f2a95e91 |
record = {'type': 'record', 'entry': entry, 'tail': b''}
|
6cca1603 |
length = 0
length += simple_principal_to_full(inentry, index, entry)
length += simple_kvno_to_full(inentry, index, entry, record)
length += simple_timestamp_to_full(inentry, index, entry)
length += simple_enctype_to_full(inentry, index, entry)
|
9cd5654c |
if 'key' in inentry:
entry['key'] = inentry['key']
|
3493271d |
entry['key_length'] = len(to_bytes(inentry['key']))
|
6cca1603 |
length += 2 + entry['key_length']
length += len(to_bytes(record['tail']))
record['length'] = length
|
9cd5654c |
data['records'].append(record)
return data
|
acada7f7 |
def yaml_to_keytab(buf, args):
data = yaml.load(buf, Loader=yaml.SafeLoader)
|
9cd5654c |
if 'entries' in data:
# If the provided structure exposes "entries" rather than "records",
# then it very likely features the "simple" data layout.
data = simple_keytab_to_full(data)
|
c1f8275b |
if args.simple_to_full:
output_data(data, args, 0)
|
acada7f7 |
result = raw_keytab_to_binary(data)
sys.stdout.buffer.write(result)
|
82c6a243 |
def parse_args():
parser = argparse.ArgumentParser(description='Keytab <-> YAML/JSON convertor.')
parser.add_argument('--verbose', '-v', dest='verbose', action='count', help='increase verbosity level', default=VERBOSITY)
parser.add_argument('--data-layout', '-l', dest='data_layout', choices=DATA_LAYOUTS.keys(), default='simple', help='data layout (keytab to YAML/JSON only)')
parser.add_argument('--output-format', '-f', dest='output_format', choices=['json', 'yaml'], default='yaml', help='output format (keytab to YAML/JSON only)')
|
113604e7 |
parser.add_argument('--v1-endianness', '-e', dest='v1_endianness', choices=['native', 'little', 'big'], default='native', help='Enforce endianness (keytab v1 to YAML/JSON only)')
|
c1f8275b |
parser.add_argument('--simple-to-full', dest='simple_to_full', action='store_true', default=False, help='Convert from simple to full layout (YAML/JSON input only)')
|
3fe7fd4f |
parser.add_argument('--name-types', dest='list_name_types', action='store_true', default=False, help='List all known name types.')
parser.add_argument('--enc-types', dest='list_enc_types', action='store_true', default=False, help='List all known encryption types.')
|
82c6a243 |
parser.add_argument('input', nargs='?', type=argparse.FileType('rb'), default=sys.stdin.buffer, help='input file; defaults to standard input')
args = parser.parse_args()
return args
def main():
args = parse_args()
global VERBOSITY
VERBOSITY=args.verbose
|
3fe7fd4f |
if args.list_name_types:
output_data(NAME_TYPES, args, 0)
if args.list_enc_types:
output_data(ENC_TYPES, args, 0)
|
82c6a243 |
buf = args.input
first_byte = buf.peek(1)[0]
if first_byte == KEYTAB_FIRST_BYTE:
keytab_to_yaml(buf, args)
else:
yaml_to_keytab(buf, args)
if __name__ == '__main__':
main()
|