AMBARI-6808 - Ambari Agent DataCleaner should delete log files when a max size in MB is reached (Alejandro Fernandez via jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/30f8a87a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/30f8a87a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/30f8a87a Branch: refs/heads/branch-alerts-dev Commit: 30f8a87a657993f2b0632289142fd513e7948e75 Parents: 1c5ceb2 Author: Jonathan Hurley <jhur...@hortonworks.com> Authored: Tue Aug 19 14:21:16 2014 -0400 Committer: Jonathan Hurley <jhur...@hortonworks.com> Committed: Tue Aug 19 14:21:16 2014 -0400 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 + .../main/python/ambari_agent/AmbariConfig.py | 1 + .../src/main/python/ambari_agent/DataCleaner.py | 60 +++++++++++++++++--- .../test/python/ambari_agent/TestDataCleaner.py | 39 ++++++++----- 4 files changed, 79 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 9bbef26..162041a 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -24,6 +24,7 @@ tmp_dir=/var/lib/ambari-agent/data/tmp loglevel=INFO data_cleanup_interval=86400 data_cleanup_max_age=2592000 +data_cleanup_max_size_MB = 100 ping_port=8670 cache_dir=/var/lib/ambari-agent/cache tolerate_download_failures=true http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 4330eb3..4453161 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -35,6 +35,7 @@ prefix=/tmp/ambari-agent tmp_dir=/tmp/ambari-agent/tmp data_cleanup_interval=86400 data_cleanup_max_age=2592000 +data_cleanup_max_size_MB = 100 ping_port=8670 cache_dir=/var/lib/ambari-agent/cache http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py index e42caac..0102eef 100644 --- a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py +++ b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py @@ -28,23 +28,34 @@ import logging logger = logging.getLogger() class DataCleaner(threading.Thread): - FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp' + FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json' def __init__(self, config): threading.Thread.__init__(self) self.daemon = True logger.info('Data cleanup thread started') self.config = config - self.file_max_age = int(config.get('agent','data_cleanup_max_age')) - if self.file_max_age < 86400: + + self.file_max_age = config.get('agent','data_cleanup_max_age') + self.file_max_age = int(self.file_max_age) if self.file_max_age else None + if self.file_max_age is None or self.file_max_age < 86400: # keep for at least 24h logger.warn('The minimum value allowed for data_cleanup_max_age is 1 ' 'day. Setting data_cleanup_max_age to 86400.') self.file_max_age = 86400 - self.cleanup_interval = int(config.get('agent','data_cleanup_interval')) - if self.cleanup_interval < 3600: + + self.cleanup_interval = config.get('agent','data_cleanup_interval') + self.cleanup_interval = int(self.cleanup_interval) if self.cleanup_interval else None + if self.cleanup_interval is None or self.cleanup_interval < 3600: # wait at least 1 hour between runs logger.warn('The minimum value allowed for data_cleanup_interval is 1 ' 'hour. Setting data_cleanup_interval to 3600.') - self.file_max_age = 3600 + self.cleanup_interval = 3600 + + self.cleanup_max_size_MB = config.get('agent', 'data_cleanup_max_size_MB') + self.cleanup_max_size_MB = int(self.cleanup_max_size_MB) if self.cleanup_max_size_MB else None + if self.cleanup_max_size_MB is None or self.cleanup_max_size_MB > 10000: # no more than 10 GBs + logger.warn('The maximum value allowed for cleanup_max_size_MB is 10000 MB (10 GB). ' + 'Setting cleanup_max_size_MB to 10000.') + self.cleanup_max_size_MB = 10000 self.data_dir = config.get('agent','prefix') self.compiled_pattern = re.compile(self.FILE_NAME_PATTERN) @@ -54,17 +65,52 @@ class DataCleaner(threading.Thread): logger.info('Data cleanup thread killed.') def cleanup(self): + logger.debug("Cleaning up inside directory " + self.data_dir) + now = time.time() + total_size_bytes = 0 + file_path_to_timestamp = {} + file_path_to_size = {} + for root, dirs, files in os.walk(self.data_dir): for f in files: file_path = os.path.join(root, f) if self.compiled_pattern.match(f): try: - if time.time() - os.path.getmtime(file_path) > self.file_max_age: + file_age = now - os.path.getmtime(file_path) + if file_age > self.file_max_age: os.remove(os.path.join(file_path)) logger.debug('Removed file: ' + file_path) + else: + # Since file wasn't deleted in first pass, consider it for the second one with oldest files first + file_size = os.path.getsize(file_path) + total_size_bytes += file_size + file_path_to_timestamp[file_path] = file_age + file_path_to_size[file_path] = file_size except Exception: logger.error('Error when removing file: ' + file_path) + target_size_bytes = self.cleanup_max_size_MB * 1000000 + if len(file_path_to_timestamp) and total_size_bytes > target_size_bytes: + logger.info("DataCleaner values need to be more aggressive. Current size in bytes for all log files is %d, " + "and will try to clean to reach %d bytes." % (total_size_bytes, target_size_bytes)) + # Prune oldest files first + count = 0 + file_path_oldest_first_list = sorted(file_path_to_timestamp, key=file_path_to_timestamp.get, reverse=True) + for file_path in file_path_oldest_first_list: + try: + os.remove(os.path.join(file_path)) + total_size_bytes -= file_path_to_size[file_path] + count += 1 + if total_size_bytes <= target_size_bytes: + # Finally reached below the cap + break + except Exception: + pass + else: + # Did not reach below cap. + logger.warn("DataCleaner deleted an additional %d files, currently log files occupy %d bytes." % + (count, total_size_bytes)) + pass def run(self): while not self.stopped: http://git-wip-us.apache.org/repos/asf/ambari/blob/30f8a87a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py index d385697..b64dc44 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py +++ b/ambari-agent/src/test/python/ambari_agent/TestDataCleaner.py @@ -28,14 +28,15 @@ class TestDataCleaner(unittest.TestCase): def setUp(self): self.test_dir = [('/test_path', [], - ['errors-12.txt','output-12.txt','site-12.pp','site-13.pp','site-15.pp','version'])] + ['errors-12.txt', 'output-12.txt', 'site-12.pp', 'site-13.pp', 'site-15.pp', + 'structured-out-13.json', 'command-13.json', 'version'])] self.config = MagicMock() - self.config.get.side_effect = [2592000,3600 + 1,"/test_path"] + self.config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"] DataCleaner.logger = MagicMock() def test_init_success(self): config = MagicMock() - config.get.return_value = 2592000 + config.get.side_effect = [2592000, (3600 + 1), 10000, "/test_path"] DataCleaner.logger.reset_mock() cleaner = DataCleaner.DataCleaner(config) self.assertFalse(DataCleaner.logger.warn.called) @@ -43,45 +44,53 @@ class TestDataCleaner(unittest.TestCase): def test_init_warn(self): config = MagicMock() - config.get.return_value = 10 + config.get.side_effect = [1, (3600 - 1), (10000 + 1), "/test_path"] DataCleaner.logger.reset_mock() cleaner = DataCleaner.DataCleaner(config) self.assertTrue(DataCleaner.logger.warn.called) - self.assertTrue(cleaner.file_max_age == 3600) + self.assertTrue(cleaner.file_max_age == 86400) + self.assertTrue(cleaner.cleanup_interval == 3600) + self.assertTrue(cleaner.cleanup_max_size_MB == 10000) @patch('os.walk') @patch('time.time') @patch('os.path.getmtime') @patch('os.remove') - def test_cleanup_success(self,remMock,mtimeMock,timeMock,walkMock): + @patch('os.path.getsize') + def test_cleanup_success(self, sizeMock, remMock, mtimeMock, timeMock, walkMock): self.config.reset_mock() DataCleaner.logger.reset_mock() walkMock.return_value = iter(self.test_dir) timeMock.return_value = 2592000 + 2 - mtimeMock.side_effect = [1,1,1,2,1,1] + mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1] + sizeMock.return_value = 100 cleaner = DataCleaner.DataCleaner(self.config) cleaner.cleanup() - self.assertTrue(len(remMock.call_args_list) == 4) - remMock.assert_any_call('/test_path/errors-12.txt'); - remMock.assert_any_call('/test_path/output-12.txt'); - remMock.assert_any_call('/test_path/site-12.pp'); - remMock.assert_any_call('/test_path/site-15.pp'); + self.assertTrue(len(remMock.call_args_list) == 6) + remMock.assert_any_call('/test_path/errors-12.txt') + remMock.assert_any_call('/test_path/output-12.txt') + remMock.assert_any_call('/test_path/site-12.pp') + remMock.assert_any_call('/test_path/site-15.pp') + remMock.assert_any_call('/test_path/structured-out-13.json') + remMock.assert_any_call('/test_path/command-13.json') pass @patch('os.walk') @patch('time.time') @patch('os.path.getmtime') @patch('os.remove') - def test_cleanup_remove_error(self,remMock,mtimeMock,timeMock,walkMock): + @patch('os.path.getsize') + def test_cleanup_remove_error(self, sizeMock, remMock, mtimeMock, timeMock, walkMock): self.config.reset_mock() DataCleaner.logger.reset_mock() walkMock.return_value = iter(self.test_dir) timeMock.return_value = 2592000 + 2 - mtimeMock.side_effect = [1,1,1,2,1,1] + mtimeMock.side_effect = [1, 1, 1, 2, 1, 1, 1, 1] + sizeMock.return_value = 100 def side_effect(arg): if arg == '/test_path/site-15.pp': @@ -92,7 +101,7 @@ class TestDataCleaner(unittest.TestCase): cleaner = DataCleaner.DataCleaner(self.config) cleaner.cleanup() - self.assertTrue(len(remMock.call_args_list) == 4) + self.assertTrue(len(remMock.call_args_list) == 6) self.assertTrue(DataCleaner.logger.error.call_count == 1) pass