This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new d755fa9 [FLINK-22927][python] Fix the bug of JobStatus d755fa9 is described below commit d755fa945632c135a6179a837d776d21ec28f63c Author: huangxingbo <hxbks...@gmail.com> AuthorDate: Fri Jun 11 15:55:17 2021 +0800 [FLINK-22927][python] Fix the bug of JobStatus This closes #16146. --- flink-python/pyflink/common/job_client.py | 2 +- flink-python/pyflink/common/job_status.py | 7 ++----- flink-python/pyflink/table/tests/test_sql.py | 8 ++++++++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py index 1ea1e6c..e9ebcfb 100644 --- a/flink-python/pyflink/common/job_client.py +++ b/flink-python/pyflink/common/job_client.py @@ -52,7 +52,7 @@ class JobClient(object): .. versionadded:: 1.11.0 """ - return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus) + return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus._from_j_job_status) def cancel(self) -> CompletableFuture: """ diff --git a/flink-python/pyflink/common/job_status.py b/flink-python/pyflink/common/job_status.py index 71e5022..a64dc01 100644 --- a/flink-python/pyflink/common/job_status.py +++ b/flink-python/pyflink/common/job_status.py @@ -81,9 +81,6 @@ class JobStatus(Enum): SUSPENDED = 8 RECONCILING = 9 - def __init__(self, j_job_status) -> None: - self._j_job_status = j_job_status - def is_globally_terminal_state(self) -> bool: """ Checks whether this state is <i>globally terminal</i>. A globally terminal job @@ -97,7 +94,7 @@ class JobStatus(Enum): .. versionadded:: 1.11.0 """ - return self._j_job_status.isGloballyTerminalState() + return self._to_j_job_status().isGloballyTerminalState() def is_terminal_state(self) -> bool: """ @@ -112,7 +109,7 @@ class JobStatus(Enum): .. versionadded:: 1.11.0 """ - return self._j_job_status.isTerminalState() + return self._to_j_job_status().isTerminalState() @staticmethod def _from_j_job_status(j_job_status) -> 'JobStatus': diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py index 5a102ac..7fd965a 100644 --- a/flink-python/pyflink/table/tests/test_sql.py +++ b/flink-python/pyflink/table/tests/test_sql.py @@ -80,6 +80,14 @@ class StreamSqlTests(PyFlinkBlinkStreamTableTestCase): "sinks", source_sink_utils.TestAppendSink(field_names, field_types)) table_result = t_env.execute_sql("insert into sinks select * from tbl") + from pyflink.common.job_status import JobStatus + from py4j.protocol import Py4JJavaError + try: + self.assertTrue(isinstance(table_result.get_job_client().get_job_status().result(), + JobStatus)) + except Py4JJavaError as e: + self.assertIn('MiniCluster is not yet running or has already been shut down.', str(e)) + job_execution_result = table_result.get_job_client().get_job_execution_result().result() self.assertIsNotNone(job_execution_result.get_job_id()) self.assert_equals(table_result.get_table_schema().get_field_names(),