This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit b342710f574a0b49a123f774137cf5298844fbac Author: Michael Smith <michael.sm...@cloudera.com> AuthorDate: Tue Apr 9 13:19:19 2024 -0700 IMPALA-12963: Return parent PID when children spawned Returns the original PID for a command rather than any children that may be active. This happens during graceful shutdown in UBSAN tests. Also updates 'kill' to use the version of 'get_pid' that logs details to help with debugging. Moves try block in test_query_log.py to after client2 has been initialized. Removes 'drop table' on unique_database, since test suite already handles cleanup. Change-Id: I214e79507c717340863d27f68f6ea54c169e4090 Reviewed-on: http://gerrit.cloudera.org:8080/21278 Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> --- tests/common/impala_cluster.py | 54 ++++++++++++++++---------- tests/custom_cluster/test_query_log.py | 69 ++++++++++++++++------------------ 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index 51fefa715..1465ab31b 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -401,7 +401,7 @@ class Process(object): """Gets the PIDs of the process. In some circumstances, a process can run multiple times, e.g. when it forks in the Breakpad crash handler. Returns an empty list if no PIDs can be determined.""" - pids = self.__get_pids() + pids = [proc['pid'] for proc in self.__get_procs()] if pids: LOG.info("Found PIDs %s for %s" % (", ".join(map(str, pids)), " ".join(self.cmd))) else: @@ -409,37 +409,53 @@ class Process(object): " ".join(self.cmd)) return pids - def __get_pid(self): - pids = self.__get_pids() - assert len(pids) < 2, "Expected single pid but found %s" % ", ".join(map(str, pids)) - return len(pids) == 1 and pids[0] or None + def __procs_str(self, procs): + return "\n".join([str(proc) for proc in procs]) - def __get_pids(self): + def __get_pid(self): + procs = self.__get_procs() + # Return early for containerized environments + if len(procs) == 1: + return procs[0]['pid'] + + result = None + # In some circumstances - notably ubsan tests - child processes can be slow to exit. + # Only return the original process, i.e. one who's parent has a different cmd. + pids = [proc['pid'] for proc in procs] + for process in procs: + if process['ppid'] not in pids: + assert result is None,\ + "Multiple non-child processes:\n%s" % self.__procs_str(procs) + result = process['pid'] + else: + LOG.info("Child process active:\n%s" % self.__procs_str(procs)) + + return result + + def __get_procs(self): + """ + Returns a list of dicts containing {pid, ppid, cmdline} for related processes. + """ if self.container_id is not None: container_info = get_container_info(self.container_id) if container_info["State"]["Status"] != "running": return [] - return [container_info["State"]["Pid"]] + return [{'pid': container_info["State"]["Pid"], 'ppid': 0, 'cmdline': self.cmd}] # In non-containerised case, search for process based on matching command lines. - pids = [] - for pid in psutil.pids(): - try: - process = psutil.Process(pid) - if set(self.cmd) == set(process.cmdline()): - pids.append(pid) - except psutil.NoSuchProcess: - # A process from psutil.pids() no longer exists, continue. We don't log this - # error since it can refer to arbitrary processes outside of our testing code. - pass - return pids + procs = [] + for process in psutil.process_iter(['pid', 'ppid', 'cmdline']): + # Use info because it won't throw NoSuchProcess exceptions. + if set(self.cmd) == set(process.info['cmdline']): + procs.append(process.info) + return procs def kill(self, signal=SIGKILL): """ Kills the given processes. """ if self.container_id is None: - pid = self.__get_pid() + pid = self.get_pid() assert pid is not None, "No processes for %s" % self LOG.info('Killing %s with signal %s' % (self, signal)) exec_process("kill -%d %d" % (signal, pid)) diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index e02993217..dd61e7291 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -56,13 +56,11 @@ class TestQueryLogTableBase(CustomClusterTestSuite): cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', cls.PROTOCOL_BEESWAX, cls.PROTOCOL_HS2)) - def get_client(self, protocol, query_table_name=""): + def get_client(self, protocol, query_table_name=QUERY_TBL): """Retrieves the default Impala client for the specified protocol. This client is automatically closed after the test completes. Also ensures the completed queries table has been successfully created by checking the logs to verify the create table sql has finished.""" - if query_table_name == "": - query_table_name = self.QUERY_TBL # These tests run very quickly and can actually complete before Impala has finished # creating the completed queries table. Thus, to make these tests more robust, this @@ -704,24 +702,24 @@ class TestQueryLogTableHS2(TestQueryLogTableBase): impalad = self.cluster.get_first_impalad() client = self.get_client(vector.get_value('protocol')) - try: - # Execute sql statements to ensure all get written to the query log table. - sql1 = client.execute("select 1") - assert sql1.success + # Execute sql statements to ensure all get written to the query log table. + sql1 = client.execute("select 1") + assert sql1.success - sql2 = client.execute("select 2") - assert sql2.success + sql2 = client.execute("select 2") + assert sql2.success - sql3 = client.execute("select 3") - assert sql3.success + sql3 = client.execute("select 3") + assert sql3.success - impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, - 60) + impalad.service.wait_for_metric_value("impala-server.completed-queries.queued", 3, + 60) - impalad.kill_and_wait_for_exit(SIGRTMIN) + impalad.kill_and_wait_for_exit(SIGRTMIN) - client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) + client2 = self.create_client_for_nth_impalad(1, vector.get_value('protocol')) + try: def assert_func(last_iteration): results = client2.execute("select query_id,sql from {0} where query_id in " "('{1}','{2}','{3}')".format(self.QUERY_TBL, @@ -783,30 +781,27 @@ class TestQueryLogTableAll(TestQueryLogTableBase): tbl_name = "{0}.{1}".format(unique_database, unique_name) client = self.get_client(vector.get_value('protocol')) - try: - # Create the test table. - create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ - "partitioned by (category INT)".format(tbl_name) - create_tbl_results = client.execute(create_tbl_sql) - assert create_tbl_results.success - - insert_sql = "insert into {0} (id,category,product_name) values " \ - "(0,1,'the product')".format(tbl_name) - res = client.execute(insert_sql, fetch_profile_after_close=True) - assert res.success + # Create the test table. + create_tbl_sql = "create table {0} (id INT, product_name STRING) " \ + "partitioned by (category INT)".format(tbl_name) + create_tbl_results = client.execute(create_tbl_sql) + assert create_tbl_results.success - # Include the two queries run by the unique_database fixture setup. - self.cluster.get_first_impalad().service.wait_for_metric_value( - "impala-server.completed-queries.written", 4, 60) - - client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) - try: - assert client2 is not None - assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) - finally: - client2.close() + insert_sql = "insert into {0} (id,category,product_name) values " \ + "(0,1,'the product')".format(tbl_name) + res = client.execute(insert_sql, fetch_profile_after_close=True) + assert res.success + + # Include the two queries run by the unique_database fixture setup. + self.cluster.get_first_impalad().service.wait_for_metric_value( + "impala-server.completed-queries.written", 4, 60) + + client2 = self.create_client_for_nth_impalad(2, vector.get_value('protocol')) + try: + assert client2 is not None + assert_query(self.QUERY_TBL, client2, "test_query_hist_3", res.runtime_profile) finally: - client.execute("drop table if exists {0}".format(tbl_name)) + client2.close() @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " "--query_log_write_interval_s=1 "