From 1709f57ff0734d119e89f90f604c4142f801e939 Mon Sep 17 00:00:00 2001 From: Dan Helfman Date: Sun, 25 Jul 2021 22:30:15 -0700 Subject: [PATCH] Fix hang when restoring a PostgreSQL "tar" format database dump (#430). --- NEWS | 1 + borgmatic/execute.py | 20 ++++++++++++++---- tests/integration/test_execute.py | 34 ++++++++++++++++++++++++++++++- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/NEWS b/NEWS index 80b8b4b..0dc9126 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,7 @@ 1.5.16.dev0 * #379: Suppress console output in sample crontab and systemd service files. * #407: Fix syslog logging on FreeBSD. + * #430: Fix hang when restoring a PostgreSQL "tar" format database dump. * Better error messages! Switch the library used for validating configuration files (from pykwalify to jsonschema). * Link borgmatic Ansible role from installation documentation: diff --git a/borgmatic/execute.py b/borgmatic/execute.py index 95a8f6c..39b2e50 100644 --- a/borgmatic/execute.py +++ b/borgmatic/execute.py @@ -59,11 +59,12 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path): ''' # Map from output buffer to sequence of last lines. buffer_last_lines = collections.defaultdict(list) - output_buffers = [ - output_buffer_for_process(process, exclude_stdouts) + process_for_output_buffer = { + output_buffer_for_process(process, exclude_stdouts): process for process in processes if process.stdout or process.stderr - ] + } + output_buffers = list(process_for_output_buffer.keys()) # Log output for each process until they all exit. while True: @@ -71,8 +72,19 @@ def log_outputs(processes, exclude_stdouts, output_log_level, borg_local_path): (ready_buffers, _, _) = select.select(output_buffers, [], []) for ready_buffer in ready_buffers: + ready_process = process_for_output_buffer.get(ready_buffer) + + # The "ready" process has exited, but it might be a pipe destination with other + # processes (pipe sources) waiting to be read from. So as a measure to prevent + # hangs, vent all processes when one exits. + if ready_process and ready_process.poll() is not None: + for other_process in processes: + if other_process.poll() is None: + # Add the process's output to output_buffers to ensure it'll get read. + output_buffers.append(other_process.stdout) + line = ready_buffer.readline().rstrip().decode() - if not line: + if not line or not ready_process: continue # Keep the last few lines of output in case the process errors, and we need the output for diff --git a/tests/integration/test_execute.py b/tests/integration/test_execute.py index 490e3ab..d144f4d 100644 --- a/tests/integration/test_execute.py +++ b/tests/integration/test_execute.py @@ -98,7 +98,7 @@ def test_log_outputs_kills_other_processes_when_one_errors(): process, 2, 'borg' ).and_return(True) other_process = subprocess.Popen( - ['watch', 'true'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ['sleep', '2'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) flexmock(module).should_receive('exit_code_indicates_error').with_args( other_process, None, 'borg' @@ -123,6 +123,38 @@ def test_log_outputs_kills_other_processes_when_one_errors(): assert error.value.output +def test_log_outputs_vents_other_processes_when_one_exits(): + ''' + Execute a command to generate a longish random string and pipe it into another command that + exits quickly. The test is basically to ensure we don't hang forever waiting for the exited + process to read the pipe, and that the string-generating process eventually gets vented and + exits. + ''' + flexmock(module.logger).should_receive('log') + flexmock(module).should_receive('command_for_process').and_return('grep') + + process = subprocess.Popen( + ['shuf', '-zer', '-n10000', '{A..Z}'], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + other_process = subprocess.Popen( + ['true'], stdin=process.stdout, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + flexmock(module).should_receive('output_buffer_for_process').with_args( + process, (process.stdout,) + ).and_return(process.stderr) + flexmock(module).should_receive('output_buffer_for_process').with_args( + other_process, (process.stdout,) + ).and_return(other_process.stdout) + flexmock(process.stdout).should_call('readline').once() + + module.log_outputs( + (process, other_process), + exclude_stdouts=(process.stdout,), + output_log_level=logging.INFO, + borg_local_path='borg', + ) + + def test_log_outputs_truncates_long_error_output(): flexmock(module).ERROR_OUTPUT_MAX_LINE_COUNT = 0 flexmock(module.logger).should_receive('log')