This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new d755fa9  [FLINK-22927][python] Fix the bug of JobStatus
d755fa9 is described below

commit d755fa945632c135a6179a837d776d21ec28f63c
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Fri Jun 11 15:55:17 2021 +0800

    [FLINK-22927][python] Fix the bug of JobStatus
    
    This closes #16146.
---
 flink-python/pyflink/common/job_client.py    | 2 +-
 flink-python/pyflink/common/job_status.py    | 7 ++-----
 flink-python/pyflink/table/tests/test_sql.py | 8 ++++++++
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/flink-python/pyflink/common/job_client.py 
b/flink-python/pyflink/common/job_client.py
index 1ea1e6c..e9ebcfb 100644
--- a/flink-python/pyflink/common/job_client.py
+++ b/flink-python/pyflink/common/job_client.py
@@ -52,7 +52,7 @@ class JobClient(object):
 
         .. versionadded:: 1.11.0
         """
-        return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus)
+        return CompletableFuture(self._j_job_client.getJobStatus(), 
JobStatus._from_j_job_status)
 
     def cancel(self) -> CompletableFuture:
         """
diff --git a/flink-python/pyflink/common/job_status.py 
b/flink-python/pyflink/common/job_status.py
index 71e5022..a64dc01 100644
--- a/flink-python/pyflink/common/job_status.py
+++ b/flink-python/pyflink/common/job_status.py
@@ -81,9 +81,6 @@ class JobStatus(Enum):
     SUSPENDED = 8
     RECONCILING = 9
 
-    def __init__(self, j_job_status) -> None:
-        self._j_job_status = j_job_status
-
     def is_globally_terminal_state(self) -> bool:
         """
         Checks whether this state is <i>globally terminal</i>. A globally 
terminal job
@@ -97,7 +94,7 @@ class JobStatus(Enum):
 
         .. versionadded:: 1.11.0
         """
-        return self._j_job_status.isGloballyTerminalState()
+        return self._to_j_job_status().isGloballyTerminalState()
 
     def is_terminal_state(self) -> bool:
         """
@@ -112,7 +109,7 @@ class JobStatus(Enum):
 
         .. versionadded:: 1.11.0
         """
-        return self._j_job_status.isTerminalState()
+        return self._to_j_job_status().isTerminalState()
 
     @staticmethod
     def _from_j_job_status(j_job_status) -> 'JobStatus':
diff --git a/flink-python/pyflink/table/tests/test_sql.py 
b/flink-python/pyflink/table/tests/test_sql.py
index 5a102ac..7fd965a 100644
--- a/flink-python/pyflink/table/tests/test_sql.py
+++ b/flink-python/pyflink/table/tests/test_sql.py
@@ -80,6 +80,14 @@ class StreamSqlTests(PyFlinkBlinkStreamTableTestCase):
             "sinks",
             source_sink_utils.TestAppendSink(field_names, field_types))
         table_result = t_env.execute_sql("insert into sinks select * from tbl")
+        from pyflink.common.job_status import JobStatus
+        from py4j.protocol import Py4JJavaError
+        try:
+            
self.assertTrue(isinstance(table_result.get_job_client().get_job_status().result(),
+                                       JobStatus))
+        except Py4JJavaError as e:
+            self.assertIn('MiniCluster is not yet running or has already been 
shut down.', str(e))
+
         job_execution_result = 
table_result.get_job_client().get_job_execution_result().result()
         self.assertIsNotNone(job_execution_result.get_job_id())
         self.assert_equals(table_result.get_table_schema().get_field_names(),

Reply via email to