METRON-1491: The indexing topology restart logic is wrong (cstella via mmiklavc) closes apache/metron#964
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/5ed9631a Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/5ed9631a Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/5ed9631a Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual Commit: 5ed9631a2936ec60d0ea6557ca4396cffdadc688 Parents: 3083b47 Author: cstella <ceste...@gmail.com> Authored: Tue Mar 20 16:08:02 2018 -0600 Committer: Michael Miklavcic <michael.miklav...@gmail.com> Committed: Tue Mar 20 16:08:02 2018 -0600 ---------------------------------------------------------------------- .../package/scripts/indexing_commands.py | 43 ++++++++++++++++---- 1 file changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/5ed9631a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py index 4c862f0..fd78119 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_commands.py @@ -181,7 +181,7 @@ class IndexingCommands: def start_batch_indexing_topology(self, env): Logger.info('Starting ' + self.__batch_indexing_topology) - if not self.is_topology_active(env): + if not self.is_batch_topology_active(env): if self.__params.security_enabled: metron_security.kinit(self.__params.kinit_path_local, self.__params.metron_keytab_path, @@ -200,7 +200,7 @@ class IndexingCommands: def start_random_access_indexing_topology(self, env): Logger.info('Starting ' + self.__random_access_indexing_topology) - if not self.is_topology_active(env): + if not self.is_random_access_topology_active(env): if self.__params.security_enabled: metron_security.kinit(self.__params.kinit_path_local, self.__params.metron_keytab_path, @@ -263,21 +263,48 @@ class IndexingCommands: def restart_indexing_topology(self, env): Logger.info('Restarting the indexing topologies') - self.stop_indexing_topology(env) + self.restart_batch_indexing_topology(env) + self.restart_random_access_indexing_topology(env) + + def restart_batch_indexing_topology(self, env): + Logger.info('Restarting the batch indexing topology') + self.stop_batch_indexing_topology(env) + + # Wait for old topology to be cleaned up by Storm, before starting again. + retries = 0 + topology_active = self.is_batch_topology_active(env) + while topology_active and retries < 3: + Logger.info('Existing batch topology still active. Will wait and retry') + time.sleep(10) + retries += 1 + topology_active = self.is_batch_topology_active(env) + + if not topology_active: + Logger.info('Waiting for storm kill to complete') + time.sleep(30) + self.start_batch_indexing_topology(env) + Logger.info('Done restarting the batch indexing topology') + else: + Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.') + + def restart_random_access_indexing_topology(self, env): + Logger.info('Restarting the random access indexing topology') + self.stop_random_access_indexing_topology(env) # Wait for old topology to be cleaned up by Storm, before starting again. retries = 0 - topology_active = self.is_topology_active(env) - while self.is_topology_active(env) and retries < 3: - Logger.info('Existing topology still active. Will wait and retry') + topology_active = self.is_random_access_topology_active(env) + while topology_active and retries < 3: + Logger.info('Existing random access topology still active. Will wait and retry') time.sleep(10) retries += 1 + topology_active = self.is_random_access_topology_active(env) if not topology_active: Logger.info('Waiting for storm kill to complete') time.sleep(30) - self.start_indexing_topology(env) - Logger.info('Done restarting the indexing topologies') + self.start_random_access_indexing_topology(env) + Logger.info('Done restarting the random access indexing topology') else: Logger.warning('Retries exhausted. Existing topology not cleaned up. Aborting topology start.')