This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 386cea7 [FLINK-17303][python] Return TableResult for Python TableEnvironment 386cea7 is described below commit 386cea7beccf10fd5893ab2d960ec886a8acfdd0 Author: SteNicholas <programg...@163.com> AuthorDate: Wed May 6 10:09:55 2020 +0800 [FLINK-17303][python] Return TableResult for Python TableEnvironment This closes #12246. --- flink-python/pyflink/common/__init__.py | 8 + flink-python/pyflink/common/completable_future.py | 63 ++++++++ flink-python/pyflink/common/execution_mode.py | 2 +- flink-python/pyflink/common/job_client.py | 114 ++++++++++++++ .../pyflink/common/job_execution_result.py | 3 +- .../pyflink/common/{__init__.py => job_id.py} | 35 ++--- flink-python/pyflink/common/job_status.py | 167 +++++++++++++++++++++ flink-python/pyflink/table/__init__.py | 48 +++--- .../{common/__init__.py => table/result_kind.py} | 53 ++++--- flink-python/pyflink/table/table_environment.py | 4 +- flink-python/pyflink/table/table_result.py | 69 +++++++++ flink-python/pyflink/table/tests/test_sql.py | 47 +++++- 12 files changed, 542 insertions(+), 71 deletions(-) diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py index fcea204..6bf412d 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/common/__init__.py @@ -22,19 +22,27 @@ Important classes used by both Flink Streaming and Batch API: - :class:`ExecutionConfig`: A config to define the behavior of the program execution. """ +from pyflink.common.completable_future import CompletableFuture from pyflink.common.configuration import Configuration from pyflink.common.execution_config import ExecutionConfig from pyflink.common.execution_mode import ExecutionMode from pyflink.common.input_dependency_constraint import InputDependencyConstraint +from pyflink.common.job_client import JobClient from pyflink.common.job_execution_result import JobExecutionResult +from pyflink.common.job_id import JobID +from pyflink.common.job_status import JobStatus from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration __all__ = [ + 'CompletableFuture', 'Configuration', 'ExecutionConfig', 'ExecutionMode', 'InputDependencyConstraint', + 'JobClient', 'JobExecutionResult', + 'JobID', + 'JobStatus', 'RestartStrategies', 'RestartStrategyConfiguration', ] diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py new file mode 100644 index 0000000..266ea26 --- /dev/null +++ b/flink-python/pyflink/common/completable_future.py @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from asyncio import Future + +__all__ = ['CompletableFuture'] + + +class CompletableFuture(Future): + """ + A Future that may be explicitly completed (setting its value and status), supporting dependent + functions and actions that trigger upon its completion. + + When two or more threads attempt to set_result, set_exception, or cancel a CompletableFuture, + only one of them succeeds. + """ + + def __init__(self, j_completable_future, py_class=None): + super().__init__() + self._j_completable_future = j_completable_future + self._py_class = py_class + + def cancel(self): + return self._j_completable_future.cancel(True) + + def cancelled(self): + return self._j_completable_future.isCancelled() + + def done(self): + return self._j_completable_future.isDone() + + def result(self): + if self._py_class is None: + return self._j_completable_future.get() + else: + return self._py_class(self._j_completable_future.get()) + + def exception(self): + return self._exception + + def set_result(self, result): + return self._j_completable_future.complete(result) + + def set_exception(self, exception): + self._exception = exception + return self._j_completable_future.completeExceptionally(exception) + + def __str__(self): + return self._j_completable_future.toString() diff --git a/flink-python/pyflink/common/execution_mode.py b/flink-python/pyflink/common/execution_mode.py index 936f9aa..f3d9966 100644 --- a/flink-python/pyflink/common/execution_mode.py +++ b/flink-python/pyflink/common/execution_mode.py @@ -83,7 +83,7 @@ class ExecutionMode(object): elif j_execution_mode == JExecutionMode.BATCH_FORCED: return ExecutionMode.BATCH_FORCED else: - raise Exception("Unsupported java exection mode: %s" % j_execution_mode) + raise Exception("Unsupported java execution mode: %s" % j_execution_mode) @staticmethod def _to_j_execution_mode(execution_mode): diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py new file mode 100644 index 0000000..e4a1f39 --- /dev/null +++ b/flink-python/pyflink/common/job_client.py @@ -0,0 +1,114 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.common.completable_future import CompletableFuture +from pyflink.common.job_execution_result import JobExecutionResult +from pyflink.common.job_id import JobID +from pyflink.common.job_status import JobStatus + +__all__ = ['JobClient'] + + +class JobClient(object): + """ + A client that is scoped to a specific job. + """ + + def __init__(self, j_job_client): + self._j_job_client = j_job_client + + def get_job_id(self): + """ + Returns the JobID that uniquely identifies the job this client is scoped to. + + :return: JobID, or null if the job has been executed on a runtime without JobIDs + or if the execution failed. + """ + return JobID(self._j_job_client.getJobID()) + + def get_job_status(self): + """ + Requests the JobStatus of the associated job. + + :return: A CompletableFuture containing the JobStatus of the associated job. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus) + + def cancel(self): + """ + Cancels the associated job. + + :return: A CompletableFuture for canceling the associated job. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture(self._j_job_client.cancel()) + + def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory): + """ + Stops the associated job on Flink cluster. + + Stopping works only for streaming programs. Be aware, that the job might continue to run + for a while after sending the stop command, because after sources stopped to emit data all + operators need to finish processing. + + :param advance_to_end_of_event_time: Flag indicating if the source should inject a + MAX_WATERMARK in the pipeline. + :type advance_to_end_of_event_time: bool + :param savepoint_directory: Directory the savepoint should be written to. + :type savepoint_directory: str + :return: A CompletableFuture containing the path where the savepoint is located. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture( + self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory), + str) + + def trigger_savepoint(self, savepoint_directory): + """ + Triggers a savepoint for the associated job. The savepoint will be written to the given + savepoint directory. + + :param savepoint_directory: Directory the savepoint should be written to. + :type savepoint_directory: str + :return: A CompletableFuture containing the path where the savepoint is located. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str) + + def get_accumulators(self, class_loader): + """ + Requests the accumulators of the associated job. Accumulators can be requested while it + is running or after it has finished. The class loader is used to deserialize the incoming + accumulator results. + + :param class_loader: Class loader used to deserialize the incoming accumulator results. + :return: A CompletableFuture containing the accumulators of the associated job. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict) + + def get_job_execution_result(self, user_class_loader): + """ + Returns the JobExecutionResult result of the job execution of the submitted job. + + :param user_class_loader: Class loader used to deserialize the accumulators of the job. + :return: A CompletableFuture containing the JobExecutionResult result of the job execution. + :rtype: pyflink.common.CompletableFuture + """ + return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader), + JobExecutionResult) diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py index 24926bc..26ea95e 100644 --- a/flink-python/pyflink/common/job_execution_result.py +++ b/flink-python/pyflink/common/job_execution_result.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +from pyflink.common.job_id import JobID __all__ = ['JobExecutionResult'] @@ -35,7 +36,7 @@ class JobExecutionResult(object): :return: JobID, or null if the job has been executed on a runtime without JobIDs or if the execution failed. """ - return self._j_job_execution_result.getJobID() + return JobID(self._j_job_execution_result.getJobID()) def is_job_execution_result(self): """ diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/job_id.py similarity index 55% copy from flink-python/pyflink/common/__init__.py copy to flink-python/pyflink/common/job_id.py index fcea204..49c1349 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/common/job_id.py @@ -15,26 +15,21 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +__all__ = ['JobID'] -""" -Important classes used by both Flink Streaming and Batch API: - - :class:`ExecutionConfig`: - A config to define the behavior of the program execution. -""" -from pyflink.common.configuration import Configuration -from pyflink.common.execution_config import ExecutionConfig -from pyflink.common.execution_mode import ExecutionMode -from pyflink.common.input_dependency_constraint import InputDependencyConstraint -from pyflink.common.job_execution_result import JobExecutionResult -from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration +class JobID(object): + """ + Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond + to dataflow graphs. -__all__ = [ - 'Configuration', - 'ExecutionConfig', - 'ExecutionMode', - 'InputDependencyConstraint', - 'JobExecutionResult', - 'RestartStrategies', - 'RestartStrategyConfiguration', -] + Jobs act simultaneously as sessions, because jobs can be created and submitted incrementally + in different parts. Newer fragments of a graph can be attached to existing graphs, thereby + extending the current data flow graphs. + """ + + def __init__(self, j_job_id): + self._j_job_id = j_job_id + + def __str__(self): + return self._j_job_id.toString() diff --git a/flink-python/pyflink/common/job_status.py b/flink-python/pyflink/common/job_status.py new file mode 100644 index 0000000..bbd4b33 --- /dev/null +++ b/flink-python/pyflink/common/job_status.py @@ -0,0 +1,167 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.java_gateway import get_gateway + +__all__ = ['JobStatus'] + + +class JobStatus(object): + """ + Possible states of a job once it has been accepted by the job manager. + + :data:`CREATED`: + + Job is newly created, no task has started to run. + + :data:`RUNNING`: + + Some tasks are scheduled or running, some may be pending, some may be finished. + + :data:`FAILING`: + + The job has failed and is currently waiting for the cleanup to complete. + + :data:`FAILED`: + + The job has failed with a non-recoverable task failure. + + :data:`CANCELLING`: + + Job is being cancelled. + + :data:`CANCELED`: + + Job has been cancelled. + + :data:`FINISHED`: + + All of the job's tasks have successfully finished. + + :data:`RESTARTING`: + + The job is currently undergoing a reset and total restart. + + :data:`SUSPENDED`: + + The job has been suspended which means that it has been stopped but not been removed from a + potential HA job store. + + :data:`RECONCILING`: + + The job is currently reconciling and waits for task execution report to recover state. + """ + + CREATED = 0 + RUNNING = 1 + FAILING = 2 + FAILED = 3 + CANCELLING = 4 + CANCELED = 5 + FINISHED = 6 + RESTARTING = 7 + SUSPENDED = 8 + RECONCILING = 9 + + def __init__(self, j_job_status) -> None: + super().__init__() + self._j_job_status = j_job_status + + def is_globally_terminal_state(self): + """ + Checks whether this state is <i>globally terminal</i>. A globally terminal job + is complete and cannot fail any more and will not be restarted or recovered by another + standby master node. + + When a globally terminal state has been reached, all recovery data for the job is + dropped from the high-availability services. + + :return: ``True`` if this job status is globally terminal, ``False`` otherwise. + """ + return self._j_job_status.isGloballyTerminalState() + + def is_terminal_state(self): + """ + Checks whether this state is locally terminal. Locally terminal refers to the + state of a job's execution graph within an executing JobManager. If the execution graph + is locally terminal, the JobManager will not continue executing or recovering the job. + + The only state that is locally terminal, but not globally terminal is SUSPENDED, + which is typically entered when the executing JobManager looses its leader status. + + :return: ``True`` if this job status is terminal, ``False`` otherwise. + """ + return self._j_job_status.isTerminalState() + + @staticmethod + def _from_j_job_status(j_job_status): + gateway = get_gateway() + JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus + if j_job_status == JJobStatus.CREATED: + return JobStatus.CREATED + elif j_job_status == JJobStatus.RUNNING: + return JobStatus.RUNNING + elif j_job_status == JJobStatus.FAILING: + return JobStatus.FAILING + elif j_job_status == JJobStatus.FAILED: + return JobStatus.FAILED + elif j_job_status == JJobStatus.CANCELLING: + return JobStatus.CANCELLING + elif j_job_status == JJobStatus.CANCELED: + return JobStatus.CANCELED + elif j_job_status == JJobStatus.FINISHED: + return JobStatus.FINISHED + elif j_job_status == JJobStatus.RESTARTING: + return JobStatus.RESTARTING + elif j_job_status == JJobStatus.SUSPENDED: + return JobStatus.SUSPENDED + elif j_job_status == JJobStatus.RECONCILING: + return JobStatus.RECONCILING + else: + raise Exception("Unsupported java job status: %s" % j_job_status) + + @staticmethod + def _to_j_job_status(job_status): + gateway = get_gateway() + JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus + if job_status == JobStatus.CREATED: + return JJobStatus.CREATED + elif job_status == JobStatus.RUNNING: + return JJobStatus.RUNNING + elif job_status == JobStatus.FAILING: + return JJobStatus.FAILING + elif job_status == JobStatus.FAILED: + return JJobStatus.FAILED + elif job_status == JobStatus.CANCELLING: + return JJobStatus.CANCELLING + elif job_status == JobStatus.CANCELED: + return JJobStatus.CANCELED + elif job_status == JobStatus.FINISHED: + return JJobStatus.FINISHED + elif job_status == JobStatus.RESTARTING: + return JJobStatus.RESTARTING + elif job_status == JobStatus.SUSPENDED: + return JJobStatus.SUSPENDED + elif job_status == JobStatus.RECONCILING: + return JJobStatus.RECONCILING + else: + raise TypeError("Unsupported job status: %s, supported job statuses are: " + "JobStatus.CREATED, JobStatus.RUNNING, " + "JobStatus.FAILING, JobStatus.FAILED, " + "JobStatus.CANCELLING, JobStatus.CANCELED, " + "JobStatus.FINISHED, JobStatus.RESTARTING, " + "JobStatus.SUSPENDED and JobStatus.RECONCILING." % job_status) diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index 010434c..12bdf73 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -61,43 +61,47 @@ Important classes of Flink Table API: from __future__ import absolute_import from pyflink.table.environment_settings import EnvironmentSettings +from pyflink.table.explain_detail import ExplainDetail +from pyflink.table.result_kind import ResultKind +from pyflink.table.sinks import CsvTableSink, TableSink, WriteMode +from pyflink.table.sources import CsvTableSource, TableSource from pyflink.table.sql_dialect import SqlDialect -from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \ +from pyflink.table.statement_set import StatementSet +from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \ WindowGroupedTable from pyflink.table.table_config import TableConfig from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment, BatchTableEnvironment) -from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode -from pyflink.table.sources import TableSource, CsvTableSource -from pyflink.table.types import DataTypes, UserDefinedType, Row +from pyflink.table.table_result import TableResult from pyflink.table.table_schema import TableSchema +from pyflink.table.types import DataTypes, UserDefinedType, Row from pyflink.table.udf import FunctionContext, ScalarFunction -from pyflink.table.explain_detail import ExplainDetail -from pyflink.table.statement_set import StatementSet __all__ = [ - 'TableEnvironment', - 'StreamTableEnvironment', 'BatchTableEnvironment', + 'CsvTableSink', + 'CsvTableSource', + 'DataTypes', 'EnvironmentSettings', - 'Table', - 'GroupedTable', + 'ExplainDetail', + 'FunctionContext', 'GroupWindowedTable', + 'GroupedTable', 'OverWindowedTable', - 'WindowGroupedTable', + 'ResultKind', + 'Row', + 'ScalarFunction', + 'SqlDialect', + 'StatementSet', + 'StreamTableEnvironment', + 'Table', 'TableConfig', + 'TableEnvironment', + 'TableResult', + 'TableSchema', 'TableSink', 'TableSource', - 'WriteMode', - 'CsvTableSink', - 'CsvTableSource', - 'DataTypes', 'UserDefinedType', - 'Row', - 'TableSchema', - 'FunctionContext', - 'ScalarFunction', - 'SqlDialect', - 'ExplainDetail', - 'StatementSet' + 'WindowGroupedTable', + 'WriteMode' ] diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/table/result_kind.py similarity index 50% copy from flink-python/pyflink/common/__init__.py copy to flink-python/pyflink/table/result_kind.py index fcea204..68c324d 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/table/result_kind.py @@ -15,26 +15,35 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +from pyflink.java_gateway import get_gateway -""" -Important classes used by both Flink Streaming and Batch API: - - - :class:`ExecutionConfig`: - A config to define the behavior of the program execution. -""" -from pyflink.common.configuration import Configuration -from pyflink.common.execution_config import ExecutionConfig -from pyflink.common.execution_mode import ExecutionMode -from pyflink.common.input_dependency_constraint import InputDependencyConstraint -from pyflink.common.job_execution_result import JobExecutionResult -from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration - -__all__ = [ - 'Configuration', - 'ExecutionConfig', - 'ExecutionMode', - 'InputDependencyConstraint', - 'JobExecutionResult', - 'RestartStrategies', - 'RestartStrategyConfiguration', -] +__all__ = ['ResultKind'] + + +class ResultKind(object): + """ + ResultKind defines the types of the result. + + :data:`SUCCESS`: + + The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple "OK". + + :data:`SUCCESS_WITH_CONTENT`: + + The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important + content. + """ + + SUCCESS = 0 + SUCCESS_WITH_CONTENT = 1 + + @staticmethod + def _from_j_result_kind(j_result_kind): + gateway = get_gateway() + JResultKind = gateway.jvm.org.apache.flink.table.api.ResultKind + if j_result_kind == JResultKind.SUCCESS: + return ResultKind.SUCCESS + elif j_result_kind == JResultKind.SUCCESS_WITH_CONTENT: + return ResultKind.SUCCESS_WITH_CONTENT + else: + raise Exception("Unsupported Java result kind: %s" % j_result_kind) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index d2ec220..1d2be8f 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -34,6 +34,7 @@ from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescripto from pyflink.java_gateway import get_gateway from pyflink.table import Table +from pyflink.table.table_result import TableResult from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \ _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema from pyflink.util import utils @@ -531,8 +532,7 @@ class TableEnvironment(object): the affected row count for `DML` (-1 means unknown), or a string message ("OK") for other statements. """ - # TODO convert java TableResult to python TableResult once FLINK-17303 is finished - return self._j_tenv.executeSql(stmt) + return TableResult(self._j_tenv.executeSql(stmt)) def create_statement_set(self): """ diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py new file mode 100644 index 0000000..5071424 --- /dev/null +++ b/flink-python/pyflink/table/table_result.py @@ -0,0 +1,69 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.common.job_client import JobClient +from pyflink.table.result_kind import ResultKind +from pyflink.table.table_schema import TableSchema + +__all__ = ['TableResult'] + + +class TableResult(object): + """ + A :class:`~pyflink.table.TableResult` is the representation of the statement execution result. + """ + + def __init__(self, j_table_result): + self._j_table_result = j_table_result + + def get_job_client(self): + """ + For DML and DQL statement, return the JobClient which associates the submitted Flink job. + For other statements (e.g. DDL, DCL) return empty. + + :return: The job client, optional. + :rtype: pyflink.common.JobClient + """ + job_client = self._j_table_result.getJobClient() + if job_client.isPresent(): + return JobClient(job_client.get()) + else: + return None + + def get_table_schema(self): + """ + Get the schema of result. + + :return: The schema of result. + :rtype: pyflink.table.TableSchema + """ + return TableSchema(j_table_schema=self._j_table_result.getTableSchema()) + + def get_result_kind(self): + """ + Return the ResultKind which represents the result type. + + :return: The result kind. + :rtype: pyflink.table.ResultKind + """ + return ResultKind._from_j_result_kind(self._j_table_result.getResultKind()) + + def print(self): + """ + Print the result contents as tableau form to client console. + """ + self._j_table_result.print() diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py index 61de9b1..c8c0ee1 100644 --- a/flink-python/pyflink/table/tests/test_sql.py +++ b/flink-python/pyflink/table/tests/test_sql.py @@ -21,10 +21,8 @@ import subprocess import unittest from pyflink.find_flink_home import _find_flink_source_root - from pyflink.java_gateway import get_gateway - -from pyflink.table import DataTypes +from pyflink.table import DataTypes, ResultKind from pyflink.testing import source_sink_utils from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase @@ -58,6 +56,49 @@ class StreamSqlTests(SqlTests, PyFlinkStreamTableTestCase): expected = ['2,Hi,Hello', '3,Hello,Hello'] self.assert_equals(actual, expected) + def test_execute_sql(self): + t_env = self.t_env + table_result = t_env.execute_sql("create table tbl" + "(" + " a bigint," + " b int," + " c varchar" + ") with (" + " 'connector' = 'COLLECTION'," + " 'is-bounded' = 'false'" + ")") + self.assertIsNone(table_result.get_job_client()) + self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"]) + self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) + table_result.print() + + table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 'k2' = 'b')") + self.assertIsNone(table_result.get_job_client()) + self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"]) + self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) + table_result.print() + + field_names = ["k1", "k2", "c"] + field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()] + t_env.register_table_sink( + "sinks", + source_sink_utils.TestAppendSink(field_names, field_types)) + table_result = t_env.execute_sql("insert into sinks select * from tbl") + job_execution_result = table_result.get_job_client().get_job_execution_result( + get_gateway().jvm.Thread.currentThread().getContextClassLoader()).result() + self.assertIsNotNone(job_execution_result.get_job_id()) + self.assertIsNotNone(job_execution_result.get_job_execution_result()) + self.assert_equals(table_result.get_table_schema().get_field_names(), + ["default_catalog.default_database.sinks"]) + self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT) + table_result.print() + + table_result = t_env.execute_sql("drop table tbl") + self.assertIsNone(table_result.get_job_client()) + self.assert_equals(table_result.get_table_schema().get_field_names(), ["result"]) + self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) + table_result.print() + def test_sql_update(self): t_env = self.t_env source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])