borgmatic/tests/end-to-end/test_database.py

84 lines
2.5 KiB
Python
Raw Normal View History

import json
import os
import shutil
import subprocess
import sys
import tempfile
def write_configuration(config_path, repository_path, borgmatic_source_directory):
'''
Write out borgmatic configuration into a file at the config path. Set the options so as to work
for testing. This includes injecting the given repository path, borgmatic source directory for
storing database dumps, and encryption passphrase.
'''
config = '''
location:
source_directories:
- {}
repositories:
- {}
borgmatic_source_directory: {}
storage:
encryption_passphrase: "test"
hooks:
postgresql_databases:
- name: test
2019-12-11 01:43:43 +01:00
hostname: postgresql
username: postgres
password: test
mysql_databases:
- name: test
hostname: mysql
username: root
password: test
2019-12-11 01:57:12 +01:00
'''.format(
config_path, repository_path, borgmatic_source_directory
)
config_file = open(config_path, 'w')
config_file.write(config)
config_file.close()
def test_database_dump_and_restore():
# Create a Borg repository.
temporary_directory = tempfile.mkdtemp()
repository_path = os.path.join(temporary_directory, 'test.borg')
borgmatic_source_directory = os.path.join(temporary_directory, '.borgmatic')
original_working_directory = os.getcwd()
try:
config_path = os.path.join(temporary_directory, 'test.yaml')
write_configuration(config_path, repository_path, borgmatic_source_directory)
subprocess.check_call(
'borgmatic -v 2 --config {} init --encryption repokey'.format(config_path).split(' ')
)
# Run borgmatic to generate a backup archive including a database dump
subprocess.check_call('borgmatic create --config {} -v 2'.format(config_path).split(' '))
# Get the created archive name.
output = subprocess.check_output(
'borgmatic --config {} list --json'.format(config_path).split(' ')
).decode(sys.stdout.encoding)
parsed_output = json.loads(output)
assert len(parsed_output) == 1
assert len(parsed_output[0]['archives']) == 1
archive_name = parsed_output[0]['archives'][0]['archive']
# Restore the database from the archive.
subprocess.check_call(
'borgmatic --config {} restore --archive {}'.format(config_path, archive_name).split(
' '
)
)
finally:
os.chdir(original_working_directory)
shutil.rmtree(temporary_directory)