#!/usr/bin/env python3 # Copyright © 2020 Xavier G. <xavier.combinedfs@kindwolf.org> # This work is free. You can redistribute it and/or modify it under the # terms of the Do What The Fuck You Want To Public License, Version 2, # as published by Sam Hocevar. See the COPYING file for more details. import os import re import sys import stat import yaml import errno import argparse import threading # Excerpt from `apt show python3-fusepy`: # Due to a name clash with the existing API-incompatible python-fuse package, # the importable module name for fusepy in Debian is 'fusepy' instead of # upstream's 'fuse'. try: from fusepy import FUSE, FuseOSError, Operations except ImportError: from fuse import FUSE, FuseOSError, Operations DEFAULT_ROOT = '/etc/letsencrypt/live' DEFAULT_CERT_FILTER = False DEFAULT_WHITELIST = True DEFAULT_CERT_PATTERN = '.' DEFAULT_SEPARATOR = '/' DEFAULT_UID = 0 DEFAULT_GID = 0 DEFAULT_DIR_MODE = 0o555 DEFAULT_REG_MODE = 0o444 DEFAULT_KEY_MODE = 0o400 DEFAULT_SENSITIVE_PATTERN = '/privkey.pem$' RELOAD_PATH = '/reload' RELOAD_MSG_OK = b'reload ok\n' RELOAD_MSG_FAIL = b'reload fail\n' RELOAD_FILESIZE = max(len(RELOAD_MSG_OK), len(RELOAD_MSG_FAIL)) TIME_PROPS = ('st_atime', 'st_ctime', 'st_mtime') def read_mode_setting(obj, key, default): try: return int(obj[key], 8) except (KeyError, ValueError): return default class CombinedFSConfiguration(object): def __init__(self, conf_path): self.read_conf(conf_path) def read_conf(self, conf_path): with open(conf_path) as conf_file: conf = yaml.load(conf_file.read()) self.apply_conf(conf) self.path = conf_path def apply_conf(self, conf): self.root = conf.get('letsencrypt_live', DEFAULT_ROOT) self.filter = conf.get('cert_filter', DEFAULT_CERT_FILTER) self.whitelist = conf.get('cert_whitelist', DEFAULT_WHITELIST) self.pattern = conf.get('cert_pattern', DEFAULT_CERT_PATTERN) self.separator = conf.get('separator', DEFAULT_SEPARATOR) self.files = conf.get('files', {}) self.uid = int(conf.get('uid', DEFAULT_UID)) self.gid = int(conf.get('gid', DEFAULT_GID)) self.same_uid_as = conf.get('same-uid-as', None) self.same_gid_as = conf.get('same-gid-as', None) self.dir_mode = read_mode_setting(conf, 'dir_mode', DEFAULT_DIR_MODE) self.reg_mode = read_mode_setting(conf, 'reg_mode', DEFAULT_REG_MODE) self.key_mode = read_mode_setting(conf, 'key_mode', DEFAULT_KEY_MODE) self.sensitive_pattern = conf.get('sensitive_pattern', DEFAULT_SENSITIVE_PATTERN) # Compile regexes: self.sensitive_pattern_re = re.compile(self.sensitive_pattern) if self.filter: self.pattern_re = re.compile(self.pattern) # Helpers: def filter_cert(self, cert): if not self.filter: return True return bool(self.pattern_re.match(cert)) == self.whitelist def analyse_path(self, path): """ Return a tuple of three values reflecting what the given path points to. Raise a FuseOSError with ENOENT if the path does not match anything. The three values are: cert: the target certificate; None only for the root directory; filename: the requested filename; None only for: - the root directory; - the cert-specific directory when '/' is used as separator; file_spec: the specification for the requested filename. """ # Initial values: cert = None filename = None file_spec = None # Root directory: if path == '/': return cert, filename, file_spec # cert and flilename: path_components = path[1:].split('/') if self.separator == '/': if len(path_components) > 2: raise FuseOSError(errno.ENOENT) cert = path_components[0] if len(path_components) == 2: filename = path_components[1] else: if len(path_components) != 1: raise FuseOSError(errno.ENOENT) components = path_components[0].split(self.separator) if len(components) != 2: raise FuseOSError(errno.ENOENT) cert = components[0] filename = components[1] # Ensure cert is not filtered out: if not self.filter_cert(cert): raise FuseOSError(errno.ENOENT) # file_spec: if filename is not None: try: file_spec = self.files[filename] except KeyError: raise FuseOSError(errno.ENOENT) return cert, filename, file_spec def expand_path(self, cert, path): expanded_path = path.replace('${cert}', cert) if not expanded_path.startswith('/'): expanded_path = os.path.join(self.root, cert, expanded_path) return expanded_path def expand_paths(self, cert, paths): expanded_paths = [] for path in paths: method = self.expand_paths if type(path) is list else self.expand_path expanded_paths.append(method(cert, path)) return expanded_paths def get_paths(self, cert, file_spec): return self.expand_paths(cert, file_spec.get('content', [])) def is_sensitive_file(self, filepath): return self.sensitive_pattern_re.search(filepath) class CombinedFS(Operations): def __init__(self, conf_path): # Configuration: self.configuration = CombinedFSConfiguration(conf_path) # File descriptor management: self.filedesc_lock = threading.Lock() self.filedesc_index = 0 self.filedesc = {} # Helpers: def attributes(self, full_path): st = os.lstat(full_path) return dict((key, getattr(st, key)) for key in ('st_atime', 'st_ctime', 'st_gid', 'st_mode', 'st_mtime', 'st_nlink', 'st_size', 'st_uid')) def certificates(self, conf): for dentry in os.listdir(conf.root): if conf.filter_cert(dentry): fullpath = os.path.join(conf.root, dentry) if os.path.isdir(fullpath): yield dentry def get_conf(self): return self.configuration # uid/gid-related helpers; in the end, the xid (uid/gid) precedence is: # - filespec/same-xid-as # - filespec/xid # - conf/same-xid-as # - conf/xid # - DEFAULT_XID def get_uid_gid(self, conf, filespec=None): """ Just-get-it-done wrapper around get_{uid,gid}_{global,for_filespec}. """ stats = {} if filespec is None: return self.get_uid_global(conf, stats), self.get_gid_global(conf, stats) return self.get_uid_for_filespec(conf, filespec, stats), self.get_gid_for_filespec(conf, filespec, stats) def get_uid_for_filespec(self, conf, filespec, stats): """ File-specific uid selection mechanism: attempt to use file-specific same-uid-as, falling back on file-specific uid, falling back on global uid selection mechanism. """ uid = self.get_stat_attr(filespec.get('same-uid-as', None), 'st_uid', filespec.get('uid', None), stats) if uid is None: uid = self.get_uid_global(conf, stats) return uid def get_gid_for_filespec(self, conf, filespec, stats): """ File-specific gid selection mechanism: attempt to use file-specific same-gid-as, falling back on file-specific gid, falling back on global gid selection mechanism. """ gid = self.get_stat_attr(filespec.get('same-gid-as', None), 'st_gid', filespec.get('gid', None), stats) if gid is None: gid = self.get_gid_global(conf, stats) return gid def get_uid_global(self, conf, stats): """ Global uid selection mechanism: attempt to use same-uid-as, falling back on uid. """ return self.get_stat_attr(conf.same_uid_as, 'st_uid', conf.uid, stats) def get_gid_global(self, conf, stats): """ Global gid selection mechanism: attempt to use same-gid-as, falling back on gid. """ return self.get_stat_attr(conf.same_gid_as, 'st_gid', conf.gid, stats) def get_stat_attr(self, path, attr, default, stats): """ Stat path and return the request attribute, or the default value if something goes wrong. """ if path is None: return default try: return getattr(self.get_stat(path, stats), attr) except: return default def get_stat(self, path, stats): """ Simple wrapper around os.stat() that uses a dict to implement some basic caching (for the sake of uid/gid consistency, not actually for performance). Return either None or a stat structure. Should throw no exceptions as long as stats is provided. """ stat = stats.get(path) if stat is None: try: stats[path] = stat = os.stat(path) except: pass return stat # End of uid/gid-related helpers def iterate_paths(self, func, paths): for filepath in paths: try: if type(filepath) is list: # Array of file paths: look for the first existing path: for index, subpath in enumerate(filepath): try: func(subpath) # Still there? The file must exist, exit the loop: break except OSError as ose: if ose.errno == errno.ENOENT and index < len(filepath) - 1: # The file does not exist, try the next one, if any: continue else: # Reached the last file path or encountered another error: raise else: # Presumably a regular file path func(filepath) except OSError as ose: raise FuseOSError(ose.errno) def register_fd(self, file_descriptor): with self.filedesc_lock: self.filedesc_index += 1 new_fd_index = self.filedesc_index self.filedesc[new_fd_index] = file_descriptor return new_fd_index def reload(self, conf=None): try: if conf is None: conf = self.get_conf() new_conf = CombinedFSConfiguration(conf.path) # Assuming CPython, this should result in a single STORE_ATTR opcode. # Since this class features no __setattr__ implementation, the # resulting execution should be atomic. self.configuration = new_conf return new_conf except: return None def handle_reload_getattr(self, conf, fh): uid, gid = self.get_uid_gid(conf) return { 'st_nlink': 1, 'st_uid': uid, 'st_gid': gid, 'st_size': RELOAD_FILESIZE, 'st_mode': stat.S_IFREG | conf.key_mode, } def handle_reload_open(self, conf, flags): new_conf = self.reload(conf) new_fd = { 'conf': conf if new_conf is None else new_conf, 'data': RELOAD_MSG_FAIL if new_conf is None else RELOAD_MSG_OK, } return self.register_fd(new_fd) # Filesystem methods def access(self, path, mode): """ libfuse documentation states: This will be called for the access() system call. If the 'default_permissions' mount option is given, this method is not called. Since this program enforces default_permissions, this method will never be called, which makes it dead simple to implement. """ raise FuseOSError(errno.ENOTSUP) def getattr(self, path, fh=None): conf = self.get_conf() if path == RELOAD_PATH: return self.handle_reload_getattr(conf, fh) cert, filename, file_spec = conf.analyse_path(path) if filename is None: # Directory full_path = os.path.join(conf.root, path.lstrip('/')) try: dir_attrs = self.attributes(full_path) except OSError as ose: if ose.errno == errno.ENOENT and path == '/': # Non-existent "live" directory, most likely a misconf, # fake it to preserve access to RELOAD_PATH: dir_attrs = {'st_nlink': 2, 'st_size': 4096} for prop in TIME_PROPS: dir_attrs[prop] = 0 else: raise uid, gid = self.get_uid_gid(conf) dir_attrs['st_uid'] = uid dir_attrs['st_gid'] = gid dir_attrs['st_mode'] = stat.S_IFDIR | conf.dir_mode return dir_attrs uid, gid = self.get_uid_gid(conf, file_spec) attrs = { 'st_nlink': 1, 'st_uid': uid, 'st_gid': gid, 'st_size': 0, } def_mode = conf.reg_mode paths = conf.get_paths(cert, file_spec) if not paths: # Virtual empty file: root_stats = os.stat(conf.root) for prop in TIME_PROPS: attrs[prop] = getattr(root_stats, prop) attrs['st_mode'] = stat.S_IFREG | read_mode_setting(file_spec, 'mode', def_mode) return attrs # One array to hold the actual, successive filepaths, one dict to hold # the latest stat() result for each file: filepaths = [] stats = {} def stat_file(path): stats[path] = os.stat(path) filepaths.append(path) self.iterate_paths(stat_file, paths) for filepath in filepaths: stat_obj = stats[filepath] # Pick the highest/latest value for access/change/modification times: for prop in TIME_PROPS: prop_val = getattr(stat_obj, prop) if prop_val > attrs.get(prop, 0): attrs[prop] = prop_val # Add up sizes: attrs['st_size'] += stat_obj.st_size # Lower permissions if necessary: if conf.is_sensitive_file(filepath): def_mode = conf.key_mode attrs['st_mode'] = stat.S_IFREG | read_mode_setting(file_spec, 'mode', def_mode) return attrs def readdir(self, path, fh): conf = self.get_conf() cert, filename, _ = conf.analyse_path(path) # Deal only with directories: if filename: raise FuseOSError(errno.ENOTDIR) # Extra attributes, just what it takes to support dirent->d_type: dir_attrs = {'st_mode': stat.S_IFDIR } reg_attrs = {'st_mode': stat.S_IFREG } # Yield common directory entries: yield '.', dir_attrs, 0 yield '..', dir_attrs, 0 if not cert: # Top-level directory flat_mode = conf.separator != '/' for cert in self.certificates(conf): if flat_mode: for filename in conf.files: yield cert + conf.separator + filename, reg_attrs, 0 else: yield cert, dir_attrs, 0 else: # Second-level directory for filename in conf.files: yield filename, reg_attrs, 0 def open(self, path, flags): conf = self.get_conf() if path == RELOAD_PATH: return self.handle_reload_open(conf, flags) cert, filename, file_spec = conf.analyse_path(path) if not cert or not filename: raise FuseOSError(errno.ENOENT) # Being a read-only filesystem spares us the need to check most flags. new_fd = { 'conf': conf, 'cert': cert, 'filename': filename, 'file_spec': file_spec, } return self.register_fd(new_fd) def read(self, path, length, offset, fh): filedesc = self.filedesc.get(fh) # Use the same configuration as open() when it created the file descriptor: conf = filedesc['conf'] if filedesc is None: raise FuseOSError(errno.EBADF) data = filedesc.get('data') if data is None: paths = conf.get_paths(filedesc['cert'], filedesc['file_spec']) data = {'data': bytes() } def concatenate(path): data['data'] += open(path, 'rb').read() self.iterate_paths(concatenate, paths) filedesc['data'] = data = data['data'] read_chunk = data[offset:offset + length] return read_chunk def release(self, path, fh): with self.filedesc_lock: del self.filedesc[fh] # Integers in Python have arbitrary precision, i.e. they are unbounded # and thus exempt from overflows as long as they are manipulated in # pure Python. def statfs(self, path): stv = os.statvfs(self.get_conf().root) return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree', 'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag', 'f_frsize', 'f_namemax')) def readlink(self, path): # We never expose any symlink, therefore it should be safe to always # return EINVAL: raise FuseOSError(errno.EINVAL) def main(conf_path, mountpoint, foreground): FUSE(CombinedFS(conf_path), mountpoint, foreground=foreground, ro=True, default_permissions=True, allow_other=True) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Expose a transformed, version of Let\'s Encrypt / Certbot\'s "live" directory') parser.add_argument('conf_path', help='CombinedFS configuration file') parser.add_argument('mountpoint', help='mount point') parser.add_argument('-o', dest='options', help='mount options (ignored, only there for compatibility purposes)') parser.add_argument('-f', '--foreground', dest='foreground', help='run in the foreground', action='store_true') args = parser.parse_args() main(args.conf_path, args.mountpoint, args.foreground)