kindwolf.org Git repositories readally / master readally
master

Tree @master (Download .tar.gz)

readally @masterraw · history · blame

#!/usr/bin/env python3
# Copyright © 2023 Xavier G. <xavier.readally@kindwolf.org>
# This work is free. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See the COPYING file for more details.

import argparse, errno, os, stat, sys

# Excerpt from `apt show python3-fusepy`:
#   Due to a name clash with the existing API-incompatible python-fuse package,
#   the importable module name for fusepy in Debian is 'fusepy' instead of
#   upstream's 'fuse'.
try:
	from fusepy import FUSE, Operations, FuseOSError
except ImportError:
	from fuse import FUSE, Operations, FuseOSError


READALLY_FILE_TYPE_DEFAULT = '?'
READALLY_FILE_TYPES = { # similar to find -type
	stat.S_IFDIR: 'd', # directories
	stat.S_IFREG: 'f', # regular files
	stat.S_IFLNK: 'l', # symbolic links

	stat.S_IFBLK: 'b', # block devices
	stat.S_IFCHR: 'c', # character devices
	stat.S_IFIFO: 'p', # named pipes / FIFOs
	stat.S_IFSOCK: 's', # sockets

	stat.S_IFDOOR: 'D', # Solaris Doors
	stat.S_IFPORT: 'P', # Solaris event ports
	stat.S_IFWHT: 'W', # whiteouts
}

READALLY_BANNED_DEFAULT = 'bcpsDPW' + READALLY_FILE_TYPE_DEFAULT


def stat_to_filetype(struct_stat):
	return READALLY_FILE_TYPES.get(stat.S_IFMT(struct_stat.st_mode), READALLY_FILE_TYPE_DEFAULT)

def parse_mount_options(option_string):
	mount_options = {}
	for option in option_string.split(','):
		if '=' in option:
			key, value = option.split('=', 1)
			mount_options[key] = value
		else:
			if option.startswith('no'):
				mount_options[option[2:]] = False
			else:
				mount_options[option] = True
	return mount_options


class HideThis(Exception):
	pass
class OutsideOneFileSystem(HideThis):
	pass
class BannedFileType(HideThis):
	pass


def HideInadequateEntries(method):
	"""
	Decorator that provides two extra arguments: full_path and full_path_stat.
	full_path is computed based on the "self" and "path" arguments.
	full_path_stat is obtained by stat()-ing full_path.
	Raise ENOENT if the resulting struct reflects that this path is:
	- outside the original file system
	- a banned filetype
	"""
	def wrap(*args, **kwargs):
		self, path, *other_args = args # /!\ strong expectations here
		full_path = self._full_path(path)
		try:
			full_path_stat = self._stat(full_path)
		except HideThis:
			raise FuseOSError(errno.ENOENT)
		return method(self, path, full_path, full_path_stat, *other_args, **kwargs)
	return wrap


class ReadAlly(Operations):
	def __init__(self, root, mount_options):
		self.root = root
		self.options = mount_options
		self.ofsdev = os.lstat(root).st_dev if self.options.get('one-file-system') else None
		self.banned = self.options.get('banned-types', READALLY_BANNED_DEFAULT)

	def _check_one_file_system(self, struct_stat, raise_exception=True):
		outside = self.ofsdev is not None and struct_stat.st_dev != self.ofsdev
		if outside and raise_exception:
			raise OutsideOneFileSystem()
		return outside

	def _check_banned_file_type(self, struct_stat, raise_exception=True):
		banned = stat_to_filetype(struct_stat) in self.banned
		if banned and raise_exception:
			raise BannedFileType()
		return banned

	def _stat(self, path):
		struct_stat = os.lstat(path)
		self._check_one_file_system(struct_stat)
		self._check_banned_file_type(struct_stat)
		return struct_stat

	def _full_path(self, partial):
		return os.path.join(self.root, partial.lstrip('/'))

	def _process_dirent(self, full_path, dirent, dirent_stat=None):
		try:
			if dirent_stat is None:
				dirent_stat = self._stat(os.path.join(full_path, dirent))
			return dirent, {'st_mode': struct_stat.st_mode, 'st_ino': struct_stat.st_ino}, 0
		except HideThis as hte:
			raise hte
		except Exception:
			return dirent


	@HideInadequateEntries
	def statfs(self, path, full_path, full_path_stat):
		stv = os.statvfs(full_path)
		return dict((key, getattr(stv, key)) for key in ('f_bavail', 'f_bfree',
		  'f_blocks', 'f_bsize', 'f_favail', 'f_ffree', 'f_files', 'f_flag',
		  'f_frsize', 'f_namemax'))

	@HideInadequateEntries
	def getattr(self, path, full_path, full_path_stat, fh=None):
		return dict((key, getattr(full_path_stat, key)) for key in (
			'st_ino', 'st_atime', 'st_ctime', 'st_gid', 'st_mode', 'st_mtime',
			'st_nlink', 'st_size', 'st_uid', 'st_blksize', 'st_blocks'))

	@HideInadequateEntries
	def readdir(self, path, full_path, full_path_stat, fh):
		if not stat.S_ISDIR(full_path_stat.st_mode):
			raise FuseOSError(errno.ENOTDIR)
		yield self._process_dirent(full_path, '.', full_path_stat)
		yield self._process_dirent(full_path, '..') # TODO check this
		for dirent in os.listdir(full_path):
			try:
				yield self._process_dirent(full_path, dirent)
			except HideThis:
				pass

	@HideInadequateEntries
	def readlink(self, path, full_path, full_path_stat):
		pathname = os.readlink(full_path)
		if pathname.startswith("/"):
			# Path name is absolute, sanitize it.
			return os.path.relpath(pathname, self.root)
		else:
			return pathname

	@HideInadequateEntries
	def open(self, path, full_path, full_path_stat, flags):
		return os.open(full_path, flags)

	def read(self, path, length, offset, fh):
		os.lseek(fh, offset, os.SEEK_SET)
		return os.read(fh, length)

	def release(self, path, fh):
		return os.close(fh)

def main():
	# Parse command-line arguments:
	parser = argparse.ArgumentParser(description='Expose a read-only everything-is-readable version of a given directory')
	parser.add_argument('orig_dir', help='original directory')
	parser.add_argument('mountpoint', help='mount point')
	parser.add_argument('-o', dest='options', help=f'mount options, e.g. one-file-system,banned-types={READALLY_BANNED_DEFAULT}')
	parser.add_argument('-f', '--foreground', dest='foreground', help='run in the foreground', action='store_true')
	args = parser.parse_args()

	# Further parse mount options:
	mount_options = parse_mount_options(args.options) if args.options else {}
	if 'd' in mount_options.get('banned-types', ''):
		print('error: option "banned-types": banning directories (d) is not supported')
		sys.exit(1)

	FUSE(
		ReadAlly(args.orig_dir, mount_options),
		args.mountpoint,
		foreground=args.foreground,
		use_ino=True,     # reflect actual inodes, typically to allow hardlink detection
		ro=True,          # this is a strictly read-only filesystem
		allow_other=True, # this filesystem is meant to be exposed to another user
		default_permissions=False, # tell the kernel this filesystem handles permissions
	)

if __name__ == '__main__':
	main()