From a23fdf946d1eb3ff46e3db13c903b5c0bab3d06c Mon Sep 17 00:00:00 2001 From: Dan Helfman Date: Thu, 7 May 2020 11:44:04 -0700 Subject: [PATCH] Stream database dumps and restores directly to/from Borg without using any additional filesystem space (#258). --- NEWS | 3 + borgmatic/borg/create.py | 24 ++- borgmatic/borg/extract.py | 17 +- borgmatic/commands/borgmatic.py | 124 ++++++++------- borgmatic/execute.py | 188 +++++++++++++++++++--- borgmatic/hooks/dump.py | 99 +----------- borgmatic/hooks/mysql.py | 99 +++++++----- borgmatic/hooks/postgresql.py | 122 ++++++++------- docs/how-to/backup-your-databases.md | 12 +- tests/integration/test_execute.py | 26 +-- tests/unit/hooks/test_dump.py | 113 -------------- tests/unit/hooks/test_mysql.py | 226 ++++++++++++++------------- tests/unit/hooks/test_postgresql.py | 211 +++++++++++++------------ tests/unit/test_execute.py | 20 ++- 14 files changed, 665 insertions(+), 619 deletions(-) diff --git a/NEWS b/NEWS index 93a809b..bddcae8 100644 --- a/NEWS +++ b/NEWS @@ -1,4 +1,7 @@ 1.5.3.dev0 + * #258: Stream database dumps and restores directly to/from Borg without using any additional + filesystem space. This feature is automatic, and works even on restores from archives made with + previous versions of borgmatic. * #293: Documentation on macOS launchd permissions issues with work-around for Full Disk Access. 1.5.2 diff --git a/borgmatic/borg/create.py b/borgmatic/borg/create.py index 7a3d274..6ccf3cd 100644 --- a/borgmatic/borg/create.py +++ b/borgmatic/borg/create.py @@ -4,7 +4,11 @@ import logging import os import tempfile -from borgmatic.execute import execute_command, execute_command_without_capture +from borgmatic.execute import ( + execute_command, + execute_command_with_processes, + execute_command_without_capture, +) logger = logging.getLogger(__name__) @@ -125,6 +129,9 @@ def borgmatic_source_directories(borgmatic_source_directory): ) +DEFAULT_ARCHIVE_NAME_FORMAT = '{hostname}-{now:%Y-%m-%dT%H:%M:%S.%f}' + + def create_archive( dry_run, repository, @@ -136,10 +143,14 @@ def create_archive( stats=False, json=False, files=False, + stream_processes=None, ): ''' Given vebosity/dry-run flags, a local or remote repository path, a location config dict, and a storage config dict, create a Borg archive and return Borg's JSON output (if any). + + If a sequence of stream processes is given (instances of subprocess.Popen), then execute the + create command while also triggering the given processes to produce output. ''' sources = _expand_directories( location_config['source_directories'] @@ -157,8 +168,7 @@ def create_archive( umask = storage_config.get('umask', None) lock_wait = storage_config.get('lock_wait', None) files_cache = location_config.get('files_cache') - default_archive_name_format = '{hostname}-{now:%Y-%m-%dT%H:%M:%S.%f}' - archive_name_format = storage_config.get('archive_name_format', default_archive_name_format) + archive_name_format = storage_config.get('archive_name_format', DEFAULT_ARCHIVE_NAME_FORMAT) extra_borg_options = storage_config.get('extra_borg_options', {}).get('create', '') full_command = ( @@ -174,7 +184,7 @@ def create_archive( + (('--noatime',) if location_config.get('atime') is False else ()) + (('--noctime',) if location_config.get('ctime') is False else ()) + (('--nobirthtime',) if location_config.get('birthtime') is False else ()) - + (('--read-special',) if location_config.get('read_special') else ()) + + (('--read-special',) if (location_config.get('read_special') or stream_processes) else ()) + (('--nobsdflags',) if location_config.get('bsd_flags') is False else ()) + (('--files-cache', files_cache) if files_cache else ()) + (('--remote-path', remote_path) if remote_path else ()) @@ -198,6 +208,7 @@ def create_archive( # The progress output isn't compatible with captured and logged output, as progress messes with # the terminal directly. + # FIXME: "--progress" and stream_processes can't be used together. if progress: execute_command_without_capture(full_command, error_on_warnings=False) return @@ -209,4 +220,9 @@ def create_archive( else: output_log_level = logging.INFO + if stream_processes: + return execute_command_with_processes( + full_command, stream_processes, output_log_level, error_on_warnings=False + ) + return execute_command(full_command, output_log_level, error_on_warnings=False) diff --git a/borgmatic/borg/extract.py b/borgmatic/borg/extract.py index 09af537..419069b 100644 --- a/borgmatic/borg/extract.py +++ b/borgmatic/borg/extract.py @@ -1,5 +1,6 @@ import logging import os +import subprocess from borgmatic.execute import execute_command, execute_command_without_capture @@ -63,12 +64,16 @@ def extract_archive( destination_path=None, progress=False, error_on_warnings=True, + extract_to_stdout=False, ): ''' Given a dry-run flag, a local or remote repository path, an archive name, zero or more paths to restore from the archive, location/storage configuration dicts, optional local and remote Borg paths, and an optional destination path to extract to, extract the archive into the current directory. + + If extract to stdout is True, then start the extraction streaming to stdout, and return that + extract process as an instance of subprocess.Popen. ''' umask = storage_config.get('umask', None) lock_wait = storage_config.get('lock_wait', None) @@ -83,6 +88,7 @@ def extract_archive( + (('--debug', '--list', '--show-rc') if logger.isEnabledFor(logging.DEBUG) else ()) + (('--dry-run',) if dry_run else ()) + (('--progress',) if progress else ()) + + (('--stdout',) if extract_to_stdout else ()) + ('::'.join((repository if ':' in repository else os.path.abspath(repository), archive)),) + (tuple(paths) if paths else ()) ) @@ -93,7 +99,16 @@ def extract_archive( execute_command_without_capture( full_command, working_directory=destination_path, error_on_warnings=error_on_warnings ) - return + return None + + if extract_to_stdout: + return execute_command( + full_command, + output_file=subprocess.PIPE, + working_directory=destination_path, + error_on_warnings=error_on_warnings, + run_to_completion=False, + ) # Error on warnings by default, as Borg only gives a warning if the restore paths don't exist in # the archive! diff --git a/borgmatic/commands/borgmatic.py b/borgmatic/commands/borgmatic.py index 5688ae9..bafda3b 100644 --- a/borgmatic/commands/borgmatic.py +++ b/borgmatic/commands/borgmatic.py @@ -83,14 +83,6 @@ def run_configuration(config_filename, config, arguments): 'pre-backup', global_arguments.dry_run, ) - dispatch.call_hooks( - 'dump_databases', - hooks, - config_filename, - dump.DATABASE_HOOK_NAMES, - location, - global_arguments.dry_run, - ) if 'check' in arguments: command.execute_hook( hooks.get('before_check'), @@ -262,6 +254,16 @@ def run_actions( ) if 'create' in arguments: logger.info('{}: Creating archive{}'.format(repository, dry_run_label)) + active_dumps = dispatch.call_hooks( + 'dump_databases', + hooks, + repository, + dump.DATABASE_HOOK_NAMES, + location, + global_arguments.dry_run, + ) + stream_processes = [process for processes in active_dumps.values() for process in processes] + json_output = borg_create.create_archive( global_arguments.dry_run, repository, @@ -273,9 +275,11 @@ def run_actions( stats=arguments['create'].stats, json=arguments['create'].json, files=arguments['create'].files, + stream_processes=stream_processes, ) if json_output: yield json.loads(json_output) + if 'check' in arguments and checks.repository_enabled_for_checks(repository, consistency): logger.info('{}: Running consistency checks'.format(repository)) borg_check.check_archives( @@ -347,57 +351,67 @@ def run_actions( if 'all' in restore_names: restore_names = [] - # Extract dumps for the named databases from the archive. - dump_patterns = dispatch.call_hooks( - 'make_database_dump_patterns', - hooks, - repository, - dump.DATABASE_HOOK_NAMES, - location, - restore_names, + archive_name = borg_list.resolve_archive_name( + repository, arguments['restore'].archive, storage, local_path, remote_path ) + found_names = set() - borg_extract.extract_archive( - global_arguments.dry_run, - repository, - borg_list.resolve_archive_name( - repository, arguments['restore'].archive, storage, local_path, remote_path - ), - dump.convert_glob_patterns_to_borg_patterns( - dump.flatten_dump_patterns(dump_patterns, restore_names) - ), - location, - storage, - local_path=local_path, - remote_path=remote_path, - destination_path='/', - progress=arguments['restore'].progress, - # We don't want glob patterns that don't match to error. - error_on_warnings=False, - ) + for hook_name, per_hook_restore_databases in hooks.items(): + if hook_name not in dump.DATABASE_HOOK_NAMES: + continue - # Map the restore names or detected dumps to the corresponding database configurations. - restore_databases = dump.get_per_hook_database_configurations( - hooks, restore_names, dump_patterns - ) + for restore_database in per_hook_restore_databases: + database_name = restore_database['name'] + if restore_names and database_name not in restore_names: + continue + + found_names.add(database_name) + dump_pattern = dispatch.call_hooks( + 'make_database_dump_pattern', + hooks, + repository, + dump.DATABASE_HOOK_NAMES, + location, + database_name, + )[hook_name] + + # Kick off a single database extract to stdout. + extract_process = borg_extract.extract_archive( + dry_run=global_arguments.dry_run, + repository=repository, + archive=archive_name, + paths=dump.convert_glob_patterns_to_borg_patterns([dump_pattern]), + location_config=location, + storage_config=storage, + local_path=local_path, + remote_path=remote_path, + destination_path='/', + progress=arguments['restore'].progress, + extract_to_stdout=True, + ) + + # Run a single database restore, consuming the extract stdout. + dispatch.call_hooks( + 'restore_database_dump', + {hook_name: [restore_database]}, + repository, + dump.DATABASE_HOOK_NAMES, + location, + global_arguments.dry_run, + extract_process, + ) + + if not restore_names and not found_names: + raise ValueError('No databases were found to restore') + + missing_names = sorted(set(restore_names) - found_names) + if missing_names: + raise ValueError( + 'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format( + ', '.join(missing_names) + ) + ) - # Finally, restore the databases and cleanup the dumps. - dispatch.call_hooks( - 'restore_database_dumps', - restore_databases, - repository, - dump.DATABASE_HOOK_NAMES, - location, - global_arguments.dry_run, - ) - dispatch.call_hooks( - 'remove_database_dumps', - restore_databases, - repository, - dump.DATABASE_HOOK_NAMES, - location, - global_arguments.dry_run, - ) if 'list' in arguments: if arguments['list'].repository is None or validate.repositories_match( repository, arguments['list'].repository diff --git a/borgmatic/execute.py b/borgmatic/execute.py index fb76498..d4e9d2e 100644 --- a/borgmatic/execute.py +++ b/borgmatic/execute.py @@ -1,5 +1,7 @@ +import collections import logging import os +import select import subprocess logger = logging.getLogger(__name__) @@ -20,12 +22,20 @@ def exit_code_indicates_error(exit_code, error_on_warnings=True): return bool(exit_code >= BORG_ERROR_EXIT_CODE) -def log_output(command, process, output_buffer, output_log_level, error_on_warnings): +def process_command(process): ''' - Given a command already executed, its process opened by subprocess.Popen(), and the process' - relevant output buffer (stderr or stdout), log its output with the requested log level. - Additionally, raise a CalledProcessException if the process exits with an error (or a warning, - if error on warnings is True). + Given a process as an instance of subprocess.Popen, return the command string that was used to + invoke it. + ''' + return process.args if isinstance(process.args, str) else ' '.join(process.args) + + +def log_output(process, output_buffer, output_log_level, error_on_warnings): + ''' + Given an executed command's process opened by subprocess.Popen(), and the process' relevant + output buffer (stderr or stdout), log its output with the requested log level. Additionally, + raise a CalledProcessError if the process exits with an error (or a warning, if error on + warnings is True). ''' last_lines = [] @@ -54,7 +64,86 @@ def log_output(command, process, output_buffer, output_log_level, error_on_warni if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT: last_lines.insert(0, '...') - raise subprocess.CalledProcessError(exit_code, ' '.join(command), '\n'.join(last_lines)) + raise subprocess.CalledProcessError( + exit_code, process_command(process), '\n'.join(last_lines) + ) + + +def output_buffer_for_process(process, exclude_stdouts): + ''' + Given an instance of subprocess.Popen and a sequence of stdouts to exclude, return either the + process's stdout or stderr. The idea is that if stdout is excluded for a process, we still have + stderr to log. + ''' + return process.stderr if process.stdout in exclude_stdouts else process.stdout + + +def log_many_outputs(processes, exclude_stdouts, output_log_level, error_on_warnings): + ''' + Given a sequence of subprocess.Popen() instances for multiple processes, log the output for each + process with the requested log level. Additionally, raise a CalledProcessError if a process + exits with an error (or a warning, if error on warnings is True). + + For simplicity, it's assumed that the output buffer for each process is its stdout. But if any + stdouts are given to exclude, then for any matching processes, log from their stderr instead. + ''' + # Map from output buffer to sequence of last lines. + buffer_last_lines = collections.defaultdict(list) + output_buffers = [output_buffer_for_process(process, exclude_stdouts) for process in processes] + + while True: + (ready_buffers, _, _) = select.select(output_buffers, [], []) + + for ready_buffer in ready_buffers: + line = ready_buffer.readline().rstrip().decode() + if not line: + continue + + # Keep the last few lines of output in case the process errors, and we need the output for + # the exception below. + last_lines = buffer_last_lines[ready_buffer] + last_lines.append(line) + if len(last_lines) > ERROR_OUTPUT_MAX_LINE_COUNT: + last_lines.pop(0) + + logger.log(output_log_level, line) + + if all(process.poll() is not None for process in processes): + break + + for process in processes: + remaining_output = ( + output_buffer_for_process(process, exclude_stdouts).read().rstrip().decode() + ) + if remaining_output: # pragma: no cover + logger.log(output_log_level, remaining_output) + + for process in processes: + exit_code = process.poll() + + if exit_code_indicates_error(exit_code, error_on_warnings): + # If an error occurs, include its output in the raised exception so that we don't + # inadvertently hide error output. + output_buffer = output_buffer_for_process(process, exclude_stdouts) + last_lines = buffer_last_lines[output_buffer] + if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT: + last_lines.insert(0, '...') + + raise subprocess.CalledProcessError( + exit_code, process_command(process), '\n'.join(last_lines) + ) + + +def log_command(full_command, input_file, output_file): + ''' + Log the given command (a sequence of command/argument strings), along with its input/output file + paths. + ''' + logger.debug( + ' '.join(full_command) + + (' < {}'.format(getattr(input_file, 'name', '')) if input_file else '') + + (' > {}'.format(getattr(output_file, 'name', '')) if output_file else '') + ) def execute_command( @@ -66,24 +155,23 @@ def execute_command( extra_environment=None, working_directory=None, error_on_warnings=True, + run_to_completion=True, ): ''' Execute the given command (a sequence of command/argument strings) and log its output at the - given log level. If output log level is None, instead capture and return the output. If an - open output file object is given, then write stdout to the file and only log stderr (but only - if an output log level is set). If an open input file object is given, then read stdin from the - file. If shell is True, execute the command within a shell. If an extra environment dict is - given, then use it to augment the current environment, and pass the result into the command. If - a working directory is given, use that as the present working directory when running the - command. If error on warnings is False, then treat exit code 1 as a warning instead of an error. + given log level. If output log level is None, instead capture and return the output. (Implies + run_to_completion.) If an open output file object is given, then write stdout to the file and + only log stderr (but only if an output log level is set). If an open input file object is given, + then read stdin from the file. If shell is True, execute the command within a shell. If an extra + environment dict is given, then use it to augment the current environment, and pass the result + into the command. If a working directory is given, use that as the present working directory + when running the command. If error on warnings is False, then treat exit code 1 as a warning + instead of an error. If run to completion is False, then return the process for the command + without executing it to completion. Raise subprocesses.CalledProcessError if an error occurs while running the command. ''' - logger.debug( - ' '.join(full_command) - + (' < {}'.format(input_file.name) if input_file else '') - + (' > {}'.format(output_file.name) if output_file else '') - ) + log_command(full_command, input_file, output_file) environment = {**os.environ, **extra_environment} if extra_environment else None if output_log_level is None: @@ -93,7 +181,7 @@ def execute_command( return output.decode() if output is not None else None else: process = subprocess.Popen( - full_command, + ' '.join(full_command) if shell else full_command, stdin=input_file, stdout=output_file or subprocess.PIPE, stderr=subprocess.PIPE if output_file else subprocess.STDOUT, @@ -101,8 +189,10 @@ def execute_command( env=environment, cwd=working_directory, ) + if not run_to_completion: + return process + log_output( - full_command, process, process.stderr if output_file else process.stdout, output_log_level, @@ -126,3 +216,61 @@ def execute_command_without_capture(full_command, working_directory=None, error_ except subprocess.CalledProcessError as error: if exit_code_indicates_error(error.returncode, error_on_warnings): raise + + +def execute_command_with_processes( + full_command, + processes, + output_log_level=logging.INFO, + output_file=None, + input_file=None, + shell=False, + extra_environment=None, + working_directory=None, + error_on_warnings=True, +): + ''' + Execute the given command (a sequence of command/argument strings) and log its output at the + given log level. Simultaneously, continue to poll one or more active processes so that they + run as well. This is useful, for instance, for processes that are streaming output to a named + pipe that the given command is consuming from. + + If an open output file object is given, then write stdout to the file and only log stderr (but + only if an output log level is set). If an open input file object is given, then read stdin from + the file. If shell is True, execute the command within a shell. If an extra environment dict is + given, then use it to augment the current environment, and pass the result into the command. If + a working directory is given, use that as the present working directory when running the + command. If error on warnings is False, then treat exit code 1 as a warning instead of an + error. + + Raise subprocesses.CalledProcessError if an error occurs while running the command or in the + upstream process. + ''' + log_command(full_command, input_file, output_file) + environment = {**os.environ, **extra_environment} if extra_environment else None + + try: + command_process = subprocess.Popen( + full_command, + stdin=input_file, + stdout=output_file or subprocess.PIPE, + stderr=subprocess.PIPE if output_file else subprocess.STDOUT, + shell=shell, + env=environment, + cwd=working_directory, + ) + except (subprocess.CalledProcessError, OSError): + # Something has gone wrong. So vent each process' output buffer to prevent it from hanging. + # And then kill the process. + for process in processes: + if process.poll() is None: + process.stdout.read(0) + process.kill() + raise + + log_many_outputs( + tuple(processes) + (command_process,), + (input_file, output_file), + output_log_level, + error_on_warnings, + ) diff --git a/borgmatic/hooks/dump.py b/borgmatic/hooks/dump.py index bd5ea08..24b73e9 100644 --- a/borgmatic/hooks/dump.py +++ b/borgmatic/hooks/dump.py @@ -1,4 +1,3 @@ -import glob import logging import os import shutil @@ -34,24 +33,13 @@ def make_database_dump_filename(dump_path, name, hostname=None): return os.path.join(os.path.expanduser(dump_path), hostname or 'localhost', name) -def flatten_dump_patterns(dump_patterns, names): +def create_named_pipe_for_dump(dump_path): ''' - Given a dict from a database hook name to glob patterns matching the dumps for the named - databases, flatten out all the glob patterns into a single sequence, and return it. - - Raise ValueError if there are no resulting glob patterns, which indicates that databases are not - configured in borgmatic's configuration. + Create a named pipe at the given dump path. ''' - flattened = [pattern for patterns in dump_patterns.values() for pattern in patterns] - - if not flattened: - raise ValueError( - 'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format( - ', '.join(names) or '"all"' - ) - ) - - return flattened + os.makedirs(os.path.dirname(dump_path), mode=0o700, exist_ok=True) + if not os.path.exists(dump_path): + os.mkfifo(dump_path, mode=0o600) def remove_database_dumps(dump_path, databases, database_type_name, log_prefix, dry_run): @@ -100,80 +88,3 @@ def convert_glob_patterns_to_borg_patterns(patterns): patterns like "sh:etc/*". ''' return ['sh:{}'.format(pattern.lstrip(os.path.sep)) for pattern in patterns] - - -def get_database_names_from_dumps(patterns): - ''' - Given a sequence of database dump patterns, find the corresponding database dumps on disk and - return the database names from their filenames. - ''' - return [os.path.basename(dump_path) for pattern in patterns for dump_path in glob.glob(pattern)] - - -def get_database_configurations(databases, names): - ''' - Given the full database configuration dicts as per the configuration schema, and a sequence of - database names, filter down and yield the configuration for just the named databases. - Additionally, if a database configuration is named "all", project out that configuration for - each named database. - ''' - named_databases = {database['name']: database for database in databases} - - for name in names: - database = named_databases.get(name) - if database: - yield database - continue - - if 'all' in named_databases: - yield {**named_databases['all'], **{'name': name}} - continue - - -def get_per_hook_database_configurations(hooks, names, dump_patterns): - ''' - Given the hooks configuration dict as per the configuration schema, a sequence of database - names to restore, and a dict from database hook name to glob patterns for matching dumps, - filter down the configuration for just the named databases. - - If there are no named databases given, then find the corresponding database dumps on disk and - use the database names from their filenames. Additionally, if a database configuration is named - "all", project out that configuration for each named database. - - Return the results as a dict from database hook name to a sequence of database configuration - dicts for that database type. - - Raise ValueError if one of the database names cannot be matched to a database in borgmatic's - database configuration. - ''' - hook_databases = { - hook_name: list( - get_database_configurations( - hooks.get(hook_name), - names or get_database_names_from_dumps(dump_patterns[hook_name]), - ) - ) - for hook_name in DATABASE_HOOK_NAMES - if hook_name in hooks - } - - if not names or 'all' in names: - if not any(hook_databases.values()): - raise ValueError( - 'Cannot restore database "all", as there are no database dumps in the archive' - ) - - return hook_databases - - found_names = { - database['name'] for databases in hook_databases.values() for database in databases - } - missing_names = sorted(set(names) - found_names) - if missing_names: - raise ValueError( - 'Cannot restore database(s) {} missing from borgmatic\'s configuration'.format( - ', '.join(missing_names) - ) - ) - - return hook_databases diff --git a/borgmatic/hooks/mysql.py b/borgmatic/hooks/mysql.py index c22eb08..82530d1 100644 --- a/borgmatic/hooks/mysql.py +++ b/borgmatic/hooks/mysql.py @@ -1,7 +1,6 @@ import logging -import os -from borgmatic.execute import execute_command +from borgmatic.execute import execute_command, execute_command_with_processes from borgmatic.hooks import dump logger = logging.getLogger(__name__) @@ -55,12 +54,16 @@ def database_names_to_dump(database, extra_environment, log_prefix, dry_run_labe def dump_databases(databases, log_prefix, location_config, dry_run): ''' - Dump the given MySQL/MariaDB databases to disk. The databases are supplied as a sequence of - dicts, one dict describing each database as per the configuration schema. Use the given log + Dump the given MySQL/MariaDB databases to a named pipe. The databases are supplied as a sequence + of dicts, one dict describing each database as per the configuration schema. Use the given log prefix in any log entries. Use the given location configuration dict to construct the - destination path. If this is a dry run, then don't actually dump anything. + destination path. + + Return a sequence of subprocess.Popen instances for the dump processes ready to spew to a named + pipe. But if this is a dry run, then don't actually dump anything and return an empty sequence. ''' dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else '' + processes = [] logger.info('{}: Dumping MySQL databases{}'.format(log_prefix, dry_run_label)) @@ -75,7 +78,8 @@ def dump_databases(databases, log_prefix, location_config, dry_run): ) dump_command = ( - ('mysqldump', '--add-drop-database') + ('mysqldump',) + + ('--add-drop-database',) + (('--host', database['hostname']) if 'hostname' in database else ()) + (('--port', str(database['port'])) if 'port' in database else ()) + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ()) @@ -83,6 +87,9 @@ def dump_databases(databases, log_prefix, location_config, dry_run): + (tuple(database['options'].split(' ')) if 'options' in database else ()) + ('--databases',) + dump_command_names + # Use shell redirection rather than execute_command(output_file=open(...)) to prevent + # the open() call on a named pipe from hanging the main borgmatic process. + + ('>', dump_filename) ) logger.debug( @@ -90,13 +97,21 @@ def dump_databases(databases, log_prefix, location_config, dry_run): log_prefix, requested_name, dump_filename, dry_run_label ) ) - if not dry_run: - os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True) + if dry_run: + continue + + dump.create_named_pipe_for_dump(dump_filename) + + processes.append( execute_command( dump_command, - output_file=open(dump_filename, 'w'), + shell=True, extra_environment=extra_environment, + run_to_completion=False, ) + ) + + return processes def remove_database_dumps(databases, log_prefix, location_config, dry_run): # pragma: no cover @@ -111,45 +126,47 @@ def remove_database_dumps(databases, log_prefix, location_config, dry_run): # p ) -def make_database_dump_patterns(databases, log_prefix, location_config, names): +def make_database_dump_pattern(databases, log_prefix, location_config, name=None): ''' Given a sequence of configurations dicts, a prefix to log with, a location configuration dict, - and a sequence of database names to match, return the corresponding glob patterns to match the - database dumps in an archive. An empty sequence of names indicates that the patterns should - match all dumps. + and a database name to match, return the corresponding glob patterns to match the database dump + in an archive. ''' - return [ - dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*') - for name in (names or ['*']) - ] + return dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*') -def restore_database_dumps(databases, log_prefix, location_config, dry_run): +def restore_database_dump(database_config, log_prefix, location_config, dry_run, extract_process): ''' - Restore the given MySQL/MariaDB databases from disk. The databases are supplied as a sequence of - dicts, one dict describing each database as per the configuration schema. Use the given log - prefix in any log entries. Use the given location configuration dict to construct the - destination path. If this is a dry run, then don't actually restore anything. + Restore the given MySQL/MariaDB database from an extract stream. The database is supplied as a + one-element sequence containing a dict describing the database, as per the configuration schema. + Use the given log prefix in any log entries. If this is a dry run, then don't actually restore + anything. Trigger the given active extract process (an instance of subprocess.Popen) to produce + output to consume. ''' dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else '' - for database in databases: - dump_filename = dump.make_database_dump_filename( - make_dump_path(location_config), database['name'], database.get('hostname') - ) - restore_command = ( - ('mysql', '--batch') - + (('--host', database['hostname']) if 'hostname' in database else ()) - + (('--port', str(database['port'])) if 'port' in database else ()) - + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ()) - + (('--user', database['username']) if 'username' in database else ()) - ) - extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None + if len(database_config) != 1: + raise ValueError('The database configuration value is invalid') - logger.debug( - '{}: Restoring MySQL database {}{}'.format(log_prefix, database['name'], dry_run_label) - ) - if not dry_run: - execute_command( - restore_command, input_file=open(dump_filename), extra_environment=extra_environment - ) + database = database_config[0] + restore_command = ( + ('mysql', '--batch') + + (('--host', database['hostname']) if 'hostname' in database else ()) + + (('--port', str(database['port'])) if 'port' in database else ()) + + (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ()) + + (('--user', database['username']) if 'username' in database else ()) + ) + extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None + + logger.debug( + '{}: Restoring MySQL database {}{}'.format(log_prefix, database['name'], dry_run_label) + ) + if dry_run: + return + + execute_command_with_processes( + restore_command, + [extract_process], + input_file=extract_process.stdout, + extra_environment=extra_environment, + ) diff --git a/borgmatic/hooks/postgresql.py b/borgmatic/hooks/postgresql.py index 9fa9633..52037f1 100644 --- a/borgmatic/hooks/postgresql.py +++ b/borgmatic/hooks/postgresql.py @@ -1,7 +1,6 @@ import logging -import os -from borgmatic.execute import execute_command +from borgmatic.execute import execute_command, execute_command_with_processes from borgmatic.hooks import dump logger = logging.getLogger(__name__) @@ -18,12 +17,16 @@ def make_dump_path(location_config): # pragma: no cover def dump_databases(databases, log_prefix, location_config, dry_run): ''' - Dump the given PostgreSQL databases to disk. The databases are supplied as a sequence of dicts, - one dict describing each database as per the configuration schema. Use the given log prefix in - any log entries. Use the given location configuration dict to construct the destination path. If - this is a dry run, then don't actually dump anything. + Dump the given PostgreSQL databases to a named pipe. The databases are supplied as a sequence of + dicts, one dict describing each database as per the configuration schema. Use the given log + prefix in any log entries. Use the given location configuration dict to construct the + destination path. + + Return a sequence of subprocess.Popen instances for the dump processes ready to spew to a named + pipe. But if this is a dry run, then don't actually dump anything and return an empty sequence. ''' dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else '' + processes = [] logger.info('{}: Dumping PostgreSQL databases{}'.format(log_prefix, dry_run_label)) @@ -39,6 +42,7 @@ def dump_databases(databases, log_prefix, location_config, dry_run): '--no-password', '--clean', '--if-exists', + '--no-sync', ) + ('--file', dump_filename) + (('--host', database['hostname']) if 'hostname' in database else ()) @@ -55,9 +59,16 @@ def dump_databases(databases, log_prefix, location_config, dry_run): log_prefix, name, dump_filename, dry_run_label ) ) - if not dry_run: - os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True) - execute_command(command, extra_environment=extra_environment) + if dry_run: + continue + + dump.create_named_pipe_for_dump(dump_filename) + + processes.append( + execute_command(command, extra_environment=extra_environment, run_to_completion=False) + ) + + return processes def remove_database_dumps(databases, log_prefix, location_config, dry_run): # pragma: no cover @@ -72,60 +83,61 @@ def remove_database_dumps(databases, log_prefix, location_config, dry_run): # p ) -def make_database_dump_patterns(databases, log_prefix, location_config, names): +def make_database_dump_pattern(databases, log_prefix, location_config, name=None): ''' Given a sequence of configurations dicts, a prefix to log with, a location configuration dict, - and a sequence of database names to match, return the corresponding glob patterns to match the - database dumps in an archive. An empty sequence of names indicates that the patterns should - match all dumps. + and a database name to match, return the corresponding glob patterns to match the database dump + in an archive. ''' - return [ - dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*') - for name in (names or ['*']) - ] + return dump.make_database_dump_filename(make_dump_path(location_config), name, hostname='*') -def restore_database_dumps(databases, log_prefix, location_config, dry_run): +def restore_database_dump(database_config, log_prefix, location_config, dry_run, extract_process): ''' - Restore the given PostgreSQL databases from disk. The databases are supplied as a sequence of - dicts, one dict describing each database as per the configuration schema. Use the given log - prefix in any log entries. Use the given location configuration dict to construct the - destination path. If this is a dry run, then don't actually restore anything. + Restore the given PostgreSQL database from an extract stream. The database is supplied as a + one-element sequence containing a dict describing the database, as per the configuration schema. + Use the given log prefix in any log entries. If this is a dry run, then don't actually restore + anything. Trigger the given active extract process (an instance of subprocess.Popen) to produce + output to consume. ''' dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else '' - for database in databases: - dump_filename = dump.make_database_dump_filename( - make_dump_path(location_config), database['name'], database.get('hostname') - ) - all_databases = bool(database['name'] == 'all') - analyze_command = ( - ('psql', '--no-password', '--quiet') - + (('--host', database['hostname']) if 'hostname' in database else ()) - + (('--port', str(database['port'])) if 'port' in database else ()) - + (('--username', database['username']) if 'username' in database else ()) - + (('--dbname', database['name']) if not all_databases else ()) - + ('--command', 'ANALYZE') - ) - restore_command = ( - ('psql' if all_databases else 'pg_restore', '--no-password') - + ( - ('--if-exists', '--exit-on-error', '--clean', '--dbname', database['name']) - if not all_databases - else () - ) - + (('--host', database['hostname']) if 'hostname' in database else ()) - + (('--port', str(database['port'])) if 'port' in database else ()) - + (('--username', database['username']) if 'username' in database else ()) - + (('-f', dump_filename) if all_databases else (dump_filename,)) - ) - extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None + if len(database_config) != 1: + raise ValueError('The database configuration value is invalid') - logger.debug( - '{}: Restoring PostgreSQL database {}{}'.format( - log_prefix, database['name'], dry_run_label - ) + database = database_config[0] + all_databases = bool(database['name'] == 'all') + analyze_command = ( + ('psql', '--no-password', '--quiet') + + (('--host', database['hostname']) if 'hostname' in database else ()) + + (('--port', str(database['port'])) if 'port' in database else ()) + + (('--username', database['username']) if 'username' in database else ()) + + (('--dbname', database['name']) if not all_databases else ()) + + ('--command', 'ANALYZE') + ) + restore_command = ( + ('psql' if all_databases else 'pg_restore', '--no-password') + + ( + ('--if-exists', '--exit-on-error', '--clean', '--dbname', database['name']) + if not all_databases + else () ) - if not dry_run: - execute_command(restore_command, extra_environment=extra_environment) - execute_command(analyze_command, extra_environment=extra_environment) + + (('--host', database['hostname']) if 'hostname' in database else ()) + + (('--port', str(database['port'])) if 'port' in database else ()) + + (('--username', database['username']) if 'username' in database else ()) + ) + extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None + + logger.debug( + '{}: Restoring PostgreSQL database {}{}'.format(log_prefix, database['name'], dry_run_label) + ) + if dry_run: + return + + execute_command_with_processes( + restore_command, + [extract_process], + input_file=extract_process.stdout, + extra_environment=extra_environment, + ) + execute_command(analyze_command, extra_environment=extra_environment) diff --git a/docs/how-to/backup-your-databases.md b/docs/how-to/backup-your-databases.md index 49f2af6..76f2b3e 100644 --- a/docs/how-to/backup-your-databases.md +++ b/docs/how-to/backup-your-databases.md @@ -22,13 +22,13 @@ hooks: - name: posts ``` -Prior to each backup, borgmatic dumps each configured database to a file -and includes it in the backup. After the backup completes, borgmatic removes -the database dump files to recover disk space. +As part of each backup, borgmatic streams a database dump for each configured +database directly to Borg, so it's included in the backup without consuming +additional disk space. -borgmatic creates these temporary dump files in `~/.borgmatic` by default. To -customize this path, set the `borgmatic_source_directory` option in the -`location` section of borgmatic's configuration. +To support this, borgmatic creates temporary named pipes in `~/.borgmatic` by +default. To customize this path, set the `borgmatic_source_directory` option +in the `location` section of borgmatic's configuration. Here's a more involved example that connects to remote databases: diff --git a/tests/integration/test_execute.py b/tests/integration/test_execute.py index c4c62c9..1792fb4 100644 --- a/tests/integration/test_execute.py +++ b/tests/integration/test_execute.py @@ -14,20 +14,12 @@ def test_log_output_logs_each_line_separately(): hi_process = subprocess.Popen(['echo', 'hi'], stdout=subprocess.PIPE) module.log_output( - ['echo', 'hi'], - hi_process, - hi_process.stdout, - output_log_level=logging.INFO, - error_on_warnings=False, + hi_process, hi_process.stdout, output_log_level=logging.INFO, error_on_warnings=False ) there_process = subprocess.Popen(['echo', 'there'], stdout=subprocess.PIPE) module.log_output( - ['echo', 'there'], - there_process, - there_process.stdout, - output_log_level=logging.INFO, - error_on_warnings=False, + there_process, there_process.stdout, output_log_level=logging.INFO, error_on_warnings=False ) @@ -39,11 +31,7 @@ def test_log_output_includes_error_output_in_exception(): with pytest.raises(subprocess.CalledProcessError) as error: module.log_output( - ['grep'], - process, - process.stdout, - output_log_level=logging.INFO, - error_on_warnings=False, + process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False ) assert error.value.returncode == 2 @@ -59,11 +47,7 @@ def test_log_output_truncates_long_error_output(): with pytest.raises(subprocess.CalledProcessError) as error: module.log_output( - ['grep'], - process, - process.stdout, - output_log_level=logging.INFO, - error_on_warnings=False, + process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False ) assert error.value.returncode == 2 @@ -76,5 +60,5 @@ def test_log_output_with_no_output_logs_nothing(): process = subprocess.Popen(['true'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) module.log_output( - ['true'], process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False + process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False ) diff --git a/tests/unit/hooks/test_dump.py b/tests/unit/hooks/test_dump.py index dc6e8dd..9bcabc0 100644 --- a/tests/unit/hooks/test_dump.py +++ b/tests/unit/hooks/test_dump.py @@ -34,29 +34,6 @@ def test_make_database_dump_filename_with_invalid_name_raises(): module.make_database_dump_filename('databases', 'invalid/name') -def test_flatten_dump_patterns_produces_list_of_all_patterns(): - dump_patterns = {'postgresql_databases': ['*/glob', 'glob/*'], 'mysql_databases': ['*/*/*']} - expected_patterns = sorted( - dump_patterns['postgresql_databases'] + dump_patterns['mysql_databases'] - ) - - assert sorted(module.flatten_dump_patterns(dump_patterns, ('bob',))) == expected_patterns - - -def test_flatten_dump_patterns_with_no_patterns_errors(): - dump_patterns = {'postgresql_databases': [], 'mysql_databases': []} - - with pytest.raises(ValueError): - assert module.flatten_dump_patterns(dump_patterns, ('bob',)) - - -def test_flatten_dump_patterns_with_no_hooks_errors(): - dump_patterns = {} - - with pytest.raises(ValueError): - assert module.flatten_dump_patterns(dump_patterns, ('bob',)) - - def test_remove_database_dumps_removes_dump_for_each_database(): databases = [{'name': 'foo'}, {'name': 'bar'}] flexmock(module).should_receive('make_database_dump_filename').with_args( @@ -107,93 +84,3 @@ def test_remove_database_dumps_without_databases_does_not_raise(): def test_convert_glob_patterns_to_borg_patterns_removes_leading_slash(): assert module.convert_glob_patterns_to_borg_patterns(('/etc/foo/bar',)) == ['sh:etc/foo/bar'] - - -def test_get_database_names_from_dumps_gets_names_from_filenames_matching_globs(): - flexmock(module.glob).should_receive('glob').and_return( - ('databases/localhost/foo',) - ).and_return(('databases/localhost/bar',)).and_return(()) - - assert module.get_database_names_from_dumps( - ('databases/*/foo', 'databases/*/bar', 'databases/*/baz') - ) == ['foo', 'bar'] - - -def test_get_database_configurations_only_produces_named_databases(): - databases = [ - {'name': 'foo', 'hostname': 'example.org'}, - {'name': 'bar', 'hostname': 'example.com'}, - {'name': 'baz', 'hostname': 'example.org'}, - ] - - assert list(module.get_database_configurations(databases, ('foo', 'baz'))) == [ - {'name': 'foo', 'hostname': 'example.org'}, - {'name': 'baz', 'hostname': 'example.org'}, - ] - - -def test_get_database_configurations_matches_all_database(): - databases = [ - {'name': 'foo', 'hostname': 'example.org'}, - {'name': 'all', 'hostname': 'example.com'}, - ] - - assert list(module.get_database_configurations(databases, ('foo', 'bar', 'baz'))) == [ - {'name': 'foo', 'hostname': 'example.org'}, - {'name': 'bar', 'hostname': 'example.com'}, - {'name': 'baz', 'hostname': 'example.com'}, - ] - - -def test_get_per_hook_database_configurations_partitions_by_hook(): - hooks = {'postgresql_databases': [flexmock()]} - names = ('foo', 'bar') - dump_patterns = flexmock() - expected_config = {'postgresql_databases': [{'name': 'foo'}, {'name': 'bar'}]} - flexmock(module).should_receive('get_database_configurations').with_args( - hooks['postgresql_databases'], names - ).and_return(expected_config['postgresql_databases']) - - config = module.get_per_hook_database_configurations(hooks, names, dump_patterns) - - assert config == expected_config - - -def test_get_per_hook_database_configurations_defaults_to_detected_database_names(): - hooks = {'postgresql_databases': [flexmock()]} - names = () - detected_names = flexmock() - dump_patterns = {'postgresql_databases': [flexmock()]} - expected_config = {'postgresql_databases': [flexmock()]} - flexmock(module).should_receive('get_database_names_from_dumps').and_return(detected_names) - flexmock(module).should_receive('get_database_configurations').with_args( - hooks['postgresql_databases'], detected_names - ).and_return(expected_config['postgresql_databases']) - - config = module.get_per_hook_database_configurations(hooks, names, dump_patterns) - - assert config == expected_config - - -def test_get_per_hook_database_configurations_with_unknown_database_name_raises(): - hooks = {'postgresql_databases': [flexmock()]} - names = ('foo', 'bar') - dump_patterns = flexmock() - flexmock(module).should_receive('get_database_configurations').with_args( - hooks['postgresql_databases'], names - ).and_return([]) - - with pytest.raises(ValueError): - module.get_per_hook_database_configurations(hooks, names, dump_patterns) - - -def test_get_per_hook_database_configurations_with_all_and_no_archive_dumps_raises(): - hooks = {'postgresql_databases': [flexmock()]} - names = ('foo', 'all') - dump_patterns = flexmock() - flexmock(module).should_receive('get_database_configurations').with_args( - hooks['postgresql_databases'], names - ).and_return([]) - - with pytest.raises(ValueError): - module.get_per_hook_database_configurations(hooks, names, dump_patterns) diff --git a/tests/unit/hooks/test_mysql.py b/tests/unit/hooks/test_mysql.py index 91c0149..53661fb 100644 --- a/tests/unit/hooks/test_mysql.py +++ b/tests/unit/hooks/test_mysql.py @@ -1,5 +1,4 @@ -import sys - +import pytest from flexmock import flexmock from borgmatic.hooks import mysql as module @@ -36,7 +35,7 @@ def test_database_names_to_dump_queries_mysql_for_database_names(): def test_dump_databases_runs_mysqldump_for_each_database(): databases = [{'name': 'foo'}, {'name': 'bar'}] - output_file = flexmock() + processes = [flexmock(), flexmock()] flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' @@ -44,17 +43,24 @@ def test_dump_databases_runs_mysqldump_for_each_database(): flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)).and_return( ('bar',) ) - flexmock(module.os).should_receive('makedirs') - flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file) + flexmock(module.dump).should_receive('create_named_pipe_for_dump') - for name in ('foo', 'bar'): + for name, process in zip(('foo', 'bar'), processes): flexmock(module).should_receive('execute_command').with_args( - ('mysqldump', '--add-drop-database', '--databases', name), - output_file=output_file, + ( + 'mysqldump', + '--add-drop-database', + '--databases', + name, + '>', + 'databases/localhost/{}'.format(name), + ), + shell=True, extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == processes def test_dump_databases_with_dry_run_skips_mysqldump(): @@ -66,7 +72,7 @@ def test_dump_databases_with_dry_run_skips_mysqldump(): flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)).and_return( ('bar',) ) - flexmock(module.os).should_receive('makedirs').never() + flexmock(module.dump).should_receive('create_named_pipe_for_dump').never() flexmock(module).should_receive('execute_command').never() module.dump_databases(databases, 'test.yaml', {}, dry_run=True) @@ -74,14 +80,13 @@ def test_dump_databases_with_dry_run_skips_mysqldump(): def test_dump_databases_runs_mysqldump_with_hostname_and_port(): databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] - output_file = flexmock() + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/database.example.org/foo' ) flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)) - flexmock(module.os).should_receive('makedirs') - flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file) + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -95,128 +100,135 @@ def test_dump_databases_runs_mysqldump_with_hostname_and_port(): 'tcp', '--databases', 'foo', + '>', + 'databases/database.example.org/foo', ), - output_file=output_file, + shell=True, extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_mysqldump_with_username_and_password(): databases = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}] - output_file = flexmock() + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ) flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)) - flexmock(module.os).should_receive('makedirs') - flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file) + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( - ('mysqldump', '--add-drop-database', '--user', 'root', '--databases', 'foo'), - output_file=output_file, + ( + 'mysqldump', + '--add-drop-database', + '--user', + 'root', + '--databases', + 'foo', + '>', + 'databases/localhost/foo', + ), + shell=True, extra_environment={'MYSQL_PWD': 'trustsome1'}, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_mysqldump_with_options(): databases = [{'name': 'foo', 'options': '--stuff=such'}] - output_file = flexmock() + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ) flexmock(module).should_receive('database_names_to_dump').and_return(('foo',)) - flexmock(module.os).should_receive('makedirs') - flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file) + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( - ('mysqldump', '--add-drop-database', '--stuff=such', '--databases', 'foo'), - output_file=output_file, + ( + 'mysqldump', + '--add-drop-database', + '--stuff=such', + '--databases', + 'foo', + '>', + 'databases/localhost/foo', + ), + shell=True, extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_mysqldump_for_all_databases(): databases = [{'name': 'all'}] - output_file = flexmock() + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/all' ) flexmock(module).should_receive('database_names_to_dump').and_return(('foo', 'bar')) - flexmock(module.os).should_receive('makedirs') - flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file) + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( - ('mysqldump', '--add-drop-database', '--databases', 'foo', 'bar'), - output_file=output_file, + ( + 'mysqldump', + '--add-drop-database', + '--databases', + 'foo', + 'bar', + '>', + 'databases/localhost/all', + ), + shell=True, + extra_environment=None, + run_to_completion=False, + ).and_return(process).once() + + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] + + +def test_restore_database_dump_runs_mysql_to_restore(): + database_config = [{'name': 'foo'}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( + ('mysql', '--batch'), + processes=[extract_process], + input_file=extract_process.stdout, extra_environment=None, ).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) - - -def test_make_database_dump_patterns_converts_names_to_glob_paths(): - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/*/foo' - ).and_return('databases/*/bar') - - assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ('foo', 'bar')) == [ - 'databases/*/foo', - 'databases/*/bar', - ] - - -def test_make_database_dump_patterns_treats_empty_names_as_matching_all_databases(): - flexmock(module).should_receive('make_dump_path').and_return('/dump/path') - flexmock(module.dump).should_receive('make_database_dump_filename').with_args( - '/dump/path', '*', '*' - ).and_return('databases/*/*') - - assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ()) == ['databases/*/*'] - - -def test_restore_database_dumps_restores_each_database(): - databases = [{'name': 'foo'}, {'name': 'bar'}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' - ).and_return('databases/localhost/bar') - - for name in ('foo', 'bar'): - dump_filename = 'databases/localhost/{}'.format(name) - input_file = flexmock() - flexmock(sys.modules['builtins']).should_receive('open').with_args( - dump_filename - ).and_return(input_file) - flexmock(module).should_receive('execute_command').with_args( - ('mysql', '--batch'), input_file=input_file, extra_environment=None - ).once() - - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) - - -def test_restore_database_dumps_runs_mysql_with_hostname_and_port(): - databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' - ) - dump_filename = 'databases/localhost/foo' - input_file = flexmock() - flexmock(sys.modules['builtins']).should_receive('open').with_args(dump_filename).and_return( - input_file + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process ) - flexmock(module).should_receive('execute_command').with_args( + +def test_restore_database_dump_errors_on_multiple_database_config(): + database_config = [{'name': 'foo'}, {'name': 'bar'}] + + flexmock(module).should_receive('execute_command_with_processes').never() + flexmock(module).should_receive('execute_command').never() + + with pytest.raises(ValueError): + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=flexmock() + ) + + +def test_restore_database_dump_runs_mysql_with_hostname_and_port(): + database_config = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( ( 'mysql', '--batch', @@ -227,29 +239,27 @@ def test_restore_database_dumps_runs_mysql_with_hostname_and_port(): '--protocol', 'tcp', ), - input_file=input_file, + processes=[extract_process], + input_file=extract_process.stdout, extra_environment=None, ).once() - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) - - -def test_restore_database_dumps_runs_mysql_with_username_and_password(): - databases = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' - ) - dump_filename = 'databases/localhost/foo' - input_file = flexmock() - flexmock(sys.modules['builtins']).should_receive('open').with_args(dump_filename).and_return( - input_file + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process ) - flexmock(module).should_receive('execute_command').with_args( + +def test_restore_database_dump_runs_mysql_with_username_and_password(): + database_config = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( ('mysql', '--batch', '--user', 'root'), - input_file=input_file, + processes=[extract_process], + input_file=extract_process.stdout, extra_environment={'MYSQL_PWD': 'trustsome1'}, ).once() - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process + ) diff --git a/tests/unit/hooks/test_postgresql.py b/tests/unit/hooks/test_postgresql.py index 5c79c22..1444267 100644 --- a/tests/unit/hooks/test_postgresql.py +++ b/tests/unit/hooks/test_postgresql.py @@ -1,3 +1,4 @@ +import pytest from flexmock import flexmock from borgmatic.hooks import postgresql as module @@ -5,19 +6,21 @@ from borgmatic.hooks import postgresql as module def test_dump_databases_runs_pg_dump_for_each_database(): databases = [{'name': 'foo'}, {'name': 'bar'}] + processes = [flexmock(), flexmock()] flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ).and_return('databases/localhost/bar') - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') - for name in ('foo', 'bar'): + for name, process in zip(('foo', 'bar'), processes): flexmock(module).should_receive('execute_command').with_args( ( 'pg_dump', '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/localhost/{}'.format(name), '--format', @@ -25,9 +28,10 @@ def test_dump_databases_runs_pg_dump_for_each_database(): name, ), extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == processes def test_dump_databases_with_dry_run_skips_pg_dump(): @@ -36,19 +40,20 @@ def test_dump_databases_with_dry_run_skips_pg_dump(): flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ).and_return('databases/localhost/bar') - flexmock(module.os).should_receive('makedirs').never() + flexmock(module.dump).should_receive('create_named_pipe_for_dump').never() flexmock(module).should_receive('execute_command').never() - module.dump_databases(databases, 'test.yaml', {}, dry_run=True) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=True) == [] def test_dump_databases_runs_pg_dump_with_hostname_and_port(): databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/database.example.org/foo' ) - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -56,6 +61,7 @@ def test_dump_databases_runs_pg_dump_with_hostname_and_port(): '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/database.example.org/foo', '--host', @@ -67,18 +73,20 @@ def test_dump_databases_runs_pg_dump_with_hostname_and_port(): 'foo', ), extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_pg_dump_with_username_and_password(): databases = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}] + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ) - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -86,6 +94,7 @@ def test_dump_databases_runs_pg_dump_with_username_and_password(): '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/localhost/foo', '--username', @@ -95,18 +104,20 @@ def test_dump_databases_runs_pg_dump_with_username_and_password(): 'foo', ), extra_environment={'PGPASSWORD': 'trustsome1'}, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_pg_dump_with_format(): databases = [{'name': 'foo', 'format': 'tar'}] + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ) - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -114,6 +125,7 @@ def test_dump_databases_runs_pg_dump_with_format(): '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/localhost/foo', '--format', @@ -121,18 +133,20 @@ def test_dump_databases_runs_pg_dump_with_format(): 'foo', ), extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_pg_dump_with_options(): databases = [{'name': 'foo', 'options': '--stuff=such'}] + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/foo' ) - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -140,6 +154,7 @@ def test_dump_databases_runs_pg_dump_with_options(): '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/localhost/foo', '--format', @@ -148,18 +163,20 @@ def test_dump_databases_runs_pg_dump_with_options(): 'foo', ), extra_environment=None, - ).once() + run_to_completion=False, + ).and_return(process).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] def test_dump_databases_runs_pg_dumpall_for_all_databases(): databases = [{'name': 'all'}] + process = flexmock() flexmock(module).should_receive('make_dump_path').and_return('') flexmock(module.dump).should_receive('make_database_dump_filename').and_return( 'databases/localhost/all' ) - flexmock(module.os).should_receive('makedirs') + flexmock(module.dump).should_receive('create_named_pipe_for_dump') flexmock(module).should_receive('execute_command').with_args( ( @@ -167,73 +184,62 @@ def test_dump_databases_runs_pg_dumpall_for_all_databases(): '--no-password', '--clean', '--if-exists', + '--no-sync', '--file', 'databases/localhost/all', ), extra_environment=None, + run_to_completion=False, + ).and_return(process).once() + + assert module.dump_databases(databases, 'test.yaml', {}, dry_run=False) == [process] + + +def test_restore_database_dump_runs_pg_restore(): + database_config = [{'name': 'foo'}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( + ( + 'pg_restore', + '--no-password', + '--if-exists', + '--exit-on-error', + '--clean', + '--dbname', + 'foo', + ), + processes=[extract_process], + input_file=extract_process.stdout, + extra_environment=None, + ).once() + flexmock(module).should_receive('execute_command').with_args( + ('psql', '--no-password', '--quiet', '--dbname', 'foo', '--command', 'ANALYZE'), + extra_environment=None, ).once() - module.dump_databases(databases, 'test.yaml', {}, dry_run=False) - - -def test_make_database_dump_patterns_converts_names_to_glob_paths(): - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/*/foo' - ).and_return('databases/*/bar') - - assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ('foo', 'bar')) == [ - 'databases/*/foo', - 'databases/*/bar', - ] - - -def test_make_database_dump_patterns_treats_empty_names_as_matching_all_databases(): - flexmock(module).should_receive('make_dump_path').and_return('/dump/path') - flexmock(module.dump).should_receive('make_database_dump_filename').with_args( - '/dump/path', '*', '*' - ).and_return('databases/*/*') - - assert module.make_database_dump_patterns(flexmock(), flexmock(), {}, ()) == ['databases/*/*'] - - -def test_restore_database_dumps_restores_each_database(): - databases = [{'name': 'foo'}, {'name': 'bar'}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' - ).and_return('databases/localhost/bar') - - for name in ('foo', 'bar'): - flexmock(module).should_receive('execute_command').with_args( - ( - 'pg_restore', - '--no-password', - '--if-exists', - '--exit-on-error', - '--clean', - '--dbname', - name, - 'databases/localhost/{}'.format(name), - ), - extra_environment=None, - ).once() - flexmock(module).should_receive('execute_command').with_args( - ('psql', '--no-password', '--quiet', '--dbname', name, '--command', 'ANALYZE'), - extra_environment=None, - ).once() - - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) - - -def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port(): - databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process ) - flexmock(module).should_receive('execute_command').with_args( + +def test_restore_database_dump_errors_on_multiple_database_config(): + database_config = [{'name': 'foo'}, {'name': 'bar'}] + + flexmock(module).should_receive('execute_command_with_processes').never() + flexmock(module).should_receive('execute_command').never() + + with pytest.raises(ValueError): + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=flexmock() + ) + + +def test_restore_database_dump_runs_pg_restore_with_hostname_and_port(): + database_config = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( ( 'pg_restore', '--no-password', @@ -246,8 +252,9 @@ def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port(): 'database.example.org', '--port', '5433', - 'databases/localhost/foo', ), + processes=[extract_process], + input_file=extract_process.stdout, extra_environment=None, ).once() flexmock(module).should_receive('execute_command').with_args( @@ -267,17 +274,16 @@ def test_restore_database_dumps_runs_pg_restore_with_hostname_and_port(): extra_environment=None, ).once() - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) - - -def test_restore_database_dumps_runs_pg_restore_with_username_and_password(): - databases = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/foo' + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process ) - flexmock(module).should_receive('execute_command').with_args( + +def test_restore_database_dump_runs_pg_restore_with_username_and_password(): + database_config = [{'name': 'foo', 'username': 'postgres', 'password': 'trustsome1'}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( ( 'pg_restore', '--no-password', @@ -288,8 +294,9 @@ def test_restore_database_dumps_runs_pg_restore_with_username_and_password(): 'foo', '--username', 'postgres', - 'databases/localhost/foo', ), + processes=[extract_process], + input_file=extract_process.stdout, extra_environment={'PGPASSWORD': 'trustsome1'}, ).once() flexmock(module).should_receive('execute_command').with_args( @@ -307,21 +314,25 @@ def test_restore_database_dumps_runs_pg_restore_with_username_and_password(): extra_environment={'PGPASSWORD': 'trustsome1'}, ).once() - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) - - -def test_restore_database_dumps_runs_psql_for_all_database_dump(): - databases = [{'name': 'all'}] - flexmock(module).should_receive('make_dump_path').and_return('') - flexmock(module.dump).should_receive('make_database_dump_filename').and_return( - 'databases/localhost/all' + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process ) - flexmock(module).should_receive('execute_command').with_args( - ('psql', '--no-password', '-f', 'databases/localhost/all'), extra_environment=None + +def test_restore_database_dump_runs_psql_for_all_database_dump(): + database_config = [{'name': 'all'}] + extract_process = flexmock(stdout=flexmock()) + + flexmock(module).should_receive('execute_command_with_processes').with_args( + ('psql', '--no-password'), + processes=[extract_process], + input_file=extract_process.stdout, + extra_environment=None, ).once() flexmock(module).should_receive('execute_command').with_args( ('psql', '--no-password', '--quiet', '--command', 'ANALYZE'), extra_environment=None ).once() - module.restore_database_dumps(databases, 'test.yaml', {}, dry_run=False) + module.restore_database_dump( + database_config, 'test.yaml', {}, dry_run=False, extract_process=extract_process + ) diff --git a/tests/unit/test_execute.py b/tests/unit/test_execute.py index 76f986e..066b4ef 100644 --- a/tests/unit/test_execute.py +++ b/tests/unit/test_execute.py @@ -87,7 +87,7 @@ def test_execute_command_calls_full_command_with_shell(): full_command = ['foo', 'bar'] flexmock(module.os, environ={'a': 'b'}) flexmock(module.subprocess).should_receive('Popen').with_args( - full_command, + ' '.join(full_command), stdin=None, stdout=module.subprocess.PIPE, stderr=module.subprocess.STDOUT, @@ -140,6 +140,24 @@ def test_execute_command_calls_full_command_with_working_directory(): assert output is None +def test_execute_command_without_run_to_completion_returns_process(): + full_command = ['foo', 'bar'] + process = flexmock() + flexmock(module.os, environ={'a': 'b'}) + flexmock(module.subprocess).should_receive('Popen').with_args( + full_command, + stdin=None, + stdout=module.subprocess.PIPE, + stderr=module.subprocess.STDOUT, + shell=False, + env=None, + cwd=None, + ).and_return(process).once() + flexmock(module).should_receive('log_output') + + assert module.execute_command(full_command, run_to_completion=False) == process + + def test_execute_command_captures_output(): full_command = ['foo', 'bar'] expected_output = '[]'