#!/usr/bin/env python3
# Copyright © 2023 Xavier G. <xavier.readally@kindwolf.org>
# This work is free. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See the COPYING file for more details.
import argparse, errno, os, stat, sys
# Excerpt from `apt show python3-fusepy`:
# Due to a name clash with the existing API-incompatible python-fuse package,
# the importable module name for fusepy in Debian is 'fusepy' instead of
# upstream's 'fuse'.
try:
from fusepy import FUSE, Operations, FuseOSError
except ImportError:
from fuse import FUSE, Operations, FuseOSError
READALLY_FILE_TYPE_DEFAULT = '?'
READALLY_FILE_TYPES = { # similar to find -type
stat.S_IFDIR: 'd', # directories
stat.S_IFREG: 'f', # regular files
stat.S_IFLNK: 'l', # symbolic links
stat.S_IFBLK: 'b', # block devices
stat.S_IFCHR: 'c', # character devices
stat.S_IFIFO: 'p', # named pipes / FIFOs
stat.S_IFSOCK: 's', # sockets
stat.S_IFDOOR: 'D', # Solaris Doors
stat.S_IFPORT: 'P', # Solaris event ports
stat.S_IFWHT: 'W', # whiteouts
}
READALLY_BANNED_DEFAULT = 'bcpsDPW' + READALLY_FILE_TYPE_DEFAULT
def stat_to_filetype(struct_stat):
return READALLY_FILE_TYPES.get(stat.S_IFMT(struct_stat.st_mode), READALLY_FILE_TYPE_DEFAULT)
def parse_mount_options(option_string):
mount_options = {}
for option in option_string.split(','):
if '=' in option:
key, value = option.split('=', 1)
mount_options[key] = value
else:
if option.startswith('no'):
mount_options[option[2:]] = False
else:
mount_options[option] = True
return mount_options
class HideThis(Exception):
pass
class OutsideOneFileSystem(HideThis):
pass
class BannedFileType(HideThis):
pass
def HideInadequateEntries(method):
"""
Decorator that provides two extra arguments: full_path and full_path_stat.
full_path is computed based on the "self" and "path" arguments.
full_path_stat is obtained by stat()-ing full_path.
Raise ENOENT if the resulting struct reflects that this path is:
- outside the original file system
- a banned filetype
"""
def wrap(*args, **kwargs):
self, path, *other_args = args # /!\ strong expectations here
full_path = self._full_path(path)
try:
full_path_stat = self._stat(full_path)
except HideThis:
raise FuseOSError(errno.ENOENT)
return method(self, path, full_path, full_path_stat, *other_args, **kwargs)
return wrap
class ReadAlly(Operations):
def __init__(self, root, mount_options):
self.root = root
self.options = mount_options
self.ofsdev = os.lstat(root).st_dev if self.options.get('one-file-system') else None
self.banned = self.options.get('banned-types', READALLY_BANNED_DEFAULT)
def _check_one_file_system(self, struct_stat, raise_exception=True):
outside = self.ofsdev is not None and struct_stat.st_dev != self.ofsdev
if outside and raise_exception:
raise OutsideOneFileSystem()
return outside
def _check_banned_file_type(self, struct_stat, raise_exception=True):
banned = stat_to_filetype(struct_stat) in self.banned
if banned and raise_exception:
raise BannedFileType()
return banned
def _stat(self, path):
struct_stat = os.lstat(path)
self._check_one_file_system(struct_stat)
self._check_banned_file_type(struct_stat)
return struct_stat
def _full_path(self, partial):
return os.path.join(self.root, partial.lstrip('/'))
def _process_dirent(self, full_path, dirent, dirent_stat=None):
try:
if dirent_stat is None:
dirent_stat = self._stat(os.path.join(full_path, dirent))
return dirent, {'st_mode': struct_stat.st_mode, 'st_ino': struct_stat.st_ino}, 0
except HideThis as hte:
raise hte
except Exception:
return dirent
@HideInadequateEntries
def statfs(self, path, full_path, full_path_stat):
stv = os.statvfs(full_path)
return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree',
'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag',
'f_frsize', 'f_namemax'))
@HideInadequateEntries
def getattr(self, path, full_path, full_path_stat, fh=None):
return dict((key, getattr(full_path_stat, key)) for key in (
'st_ino', 'st_atime', 'st_ctime', 'st_gid', 'st_mode', 'st_mtime',
'st_nlink', 'st_size', 'st_uid', 'st_blksize', 'st_blocks'))
@HideInadequateEntries
def readdir(self, path, full_path, full_path_stat, fh):
if not stat.S_ISDIR(full_path_stat.st_mode):
raise FuseOSError(errno.ENOTDIR)
yield self._process_dirent(full_path, '.', full_path_stat)
yield self._process_dirent(full_path, '..') # TODO check this
for dirent in os.listdir(full_path):
try:
yield self._process_dirent(full_path, dirent)
except HideThis:
pass
@HideInadequateEntries
def readlink(self, path, full_path, full_path_stat):
pathname = os.readlink(full_path)
if pathname.startswith("/"):
# Path name is absolute, sanitize it.
return os.path.relpath(pathname, self.root)
else:
return pathname
@HideInadequateEntries
def open(self, path, full_path, full_path_stat, flags):
return os.open(full_path, flags)
def read(self, path, length, offset, fh):
os.lseek(fh, offset, os.SEEK_SET)
return os.read(fh, length)
def release(self, path, fh):
return os.close(fh)
def main():
# Parse command-line arguments:
parser = argparse.ArgumentParser(description='Expose a read-only everything-is-readable version of a given directory')
parser.add_argument('orig_dir', help='original directory')
parser.add_argument('mountpoint', help='mount point')
parser.add_argument('-o', dest='options', help=f'mount options, e.g. one-file-system,banned-types={READALLY_BANNED_DEFAULT}')
parser.add_argument('-f', '--foreground', dest='foreground', help='run in the foreground', action='store_true')
args = parser.parse_args()
# Further parse mount options:
mount_options = parse_mount_options(args.options) if args.options else {}
if 'd' in mount_options.get('banned-types', ''):
print('error: option "banned-types": banning directories (d) is not supported')
sys.exit(1)
FUSE(
ReadAlly(args.orig_dir, mount_options),
args.mountpoint,
foreground=args.foreground,
use_ino=True, # reflect actual inodes, typically to allow hardlink detection
ro=True, # this is a strictly read-only filesystem
allow_other=True, # this filesystem is meant to be exposed to another user
default_permissions=False, # tell the kernel this filesystem handles permissions
)
if __name__ == '__main__':
main()