This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new da3eb2e [FLINK-12704][python] Enable the configuration of using blink planner. da3eb2e is described below commit da3eb2e07e7c9a2aeda2c3bef803624349ae709a Author: Wei Zhong <weizhong0...@gmail.com> AuthorDate: Thu Aug 1 17:15:29 2019 +0800 [FLINK-12704][python] Enable the configuration of using blink planner. This closes #9314 --- flink-python/pyflink/table/__init__.py | 4 + flink-python/pyflink/table/environment_settings.py | 199 +++++++++++++++++++++ flink-python/pyflink/table/table_environment.py | 141 ++++++++++++--- .../table/tests/test_environment_settings.py | 133 ++++++++++++++ .../test_environment_settings_completeness.py | 67 +++++++ .../table/tests/test_table_environment_api.py | 92 +++++++++- flink-python/pyflink/testing/test_case_utils.py | 19 ++ 7 files changed, 626 insertions(+), 29 deletions(-) diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index 48a150e..e69a9b7 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -27,6 +27,8 @@ Important classes of Flink Table API: - :class:`pyflink.table.TableConfig` A config to define the runtime behavior of the Table API. It is necessary when creating :class:`TableEnvironment`. + - :class:`pyflink.table.EnvironmentSettings` + Defines all parameters that initialize a table environment. - :class:`pyflink.table.StreamQueryConfig` and :class:`pyflink.table.BatchQueryConfig` A query config holds parameters to configure the behavior of queries. - :class:`pyflink.table.TableSource` @@ -53,6 +55,7 @@ Important classes of Flink Table API: """ from __future__ import absolute_import +from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \ WindowGroupedTable from pyflink.table.table_config import TableConfig @@ -67,6 +70,7 @@ __all__ = [ 'TableEnvironment', 'StreamTableEnvironment', 'BatchTableEnvironment', + 'EnvironmentSettings', 'Table', 'GroupedTable', 'GroupWindowedTable', diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py new file mode 100644 index 0000000..d6fda40 --- /dev/null +++ b/flink-python/pyflink/table/environment_settings.py @@ -0,0 +1,199 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.java_gateway import get_gateway + +__all__ = ['EnvironmentSettings'] + + +class EnvironmentSettings(object): + """ + Defines all parameters that initialize a table environment. Those parameters are used only + during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed + afterwards. + + Example: + :: + + >>> EnvironmentSettings.new_instance() \\ + ... .use_old_planner() \\ + ... .in_streaming_mode() \\ + ... .with_built_in_catalog_name("my_catalog") \\ + ... .with_built_in_database_name("my_database") \\ + ... .build() + """ + + class Builder(object): + """ + A builder for :class:`EnvironmentSettings`. + """ + + def __init__(self): + gateway = get_gateway() + self._j_builder = gateway.jvm.EnvironmentSettings.Builder() + + def use_old_planner(self): + """ + Sets the old Flink planner as the required module. + + This is the default behavior. + + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.useOldPlanner() + return self + + def use_blink_planner(self): + """ + Sets the Blink planner as the required module. By default, :func:`use_old_planner` is + enabled. + + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.useBlinkPlanner() + return self + + def use_any_planner(self): + """ + Does not set a planner requirement explicitly. + + A planner will be discovered automatically, if there is only one planner available. + + By default, :func:`use_old_planner` is enabled. + + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.useAnyPlanner() + return self + + def in_batch_mode(self): + """ + Sets that the components should work in a batch mode. Streaming mode by default. + + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.inBatchMode() + return self + + def in_streaming_mode(self): + """ + Sets that the components should work in a streaming mode. Enabled by default. + + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.inStreamingMode() + return self + + def with_built_in_catalog_name(self, built_in_catalog_name): + """ + Specifies the name of the initial catalog to be created when instantiating + a :class:`~pyflink.table.TableEnvironment`. This catalog will be used to store all + non-serializable objects such as tables and functions registered via e.g. + :func:`~pyflink.table.TableEnvironment.register_table_sink` or + :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the + initial value for the current catalog which can be altered via + :func:`~pyflink.table.TableEnvironment.use_catalog`. + + Default: "default_catalog". + + :param built_in_catalog_name: The specified built-in catalog name. + :type built_in_catalog_name: str + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name) + return self + + def with_built_in_database_name(self, built_in_database_name): + """ + Specifies the name of the default database in the initial catalog to be + created when instantiating a :class:`~pyflink.table.TableEnvironment`. The database + will be used to store all non-serializable objects such as tables and functions + registered via e.g. :func:`~pyflink.table.TableEnvironment.register_table_sink` or + :func:`~pyflink.table.TableEnvironment.register_java_function`. It will also be the + initial value for the current database which can be altered via + :func:`~pyflink.table.TableEnvironment.use_database`. + + Default: "default_database". + + :param built_in_database_name: The specified built-in database name. + :type built_in_database_name: str + :return: This object. + :rtype: EnvironmentSettings.Builder + """ + self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name) + return self + + def build(self): + """ + Returns an immutable instance of EnvironmentSettings. + + :return: an immutable instance of EnvironmentSettings. + :rtype: EnvironmentSettings + """ + return EnvironmentSettings(self._j_builder.build()) + + def __init__(self, j_environment_settings): + self._j_environment_settings = j_environment_settings + + def get_built_in_catalog_name(self): + """ + Gets the specified name of the initial catalog to be created when instantiating a + :class:`~pyflink.table.TableEnvironment`. + + :return: The specified name of the initial catalog to be created. + :rtype: str + """ + return self._j_environment_settings.getBuiltInCatalogName() + + def get_built_in_database_name(self): + """ + Gets the specified name of the default database in the initial catalog to be created when + instantiating a :class:`~pyflink.table.TableEnvironment`. + + :return: The specified name of the default database in the initial catalog to be created. + :rtype: str + """ + return self._j_environment_settings.getBuiltInDatabaseName() + + def is_streaming_mode(self): + """ + Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming + mode. + + :return: True if the TableEnvironment should work in a streaming mode, false otherwise. + :rtype: bool + """ + return self._j_environment_settings.isStreamingMode() + + @staticmethod + def new_instance(): + """ + Creates a builder for creating an instance of EnvironmentSettings. + + By default, it does not specify a required planner and will use the one that is available + on the classpath via discovery. + + :return: A builder of EnvironmentSettings. + :rtype: EnvironmentSettings.Builder + """ + return EnvironmentSettings.Builder() diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index 2c23deb..b7ad5b5 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -19,6 +19,8 @@ import os import tempfile from abc import ABCMeta, abstractmethod +from py4j.java_gateway import get_java_class + from pyflink.serializers import BatchedSerializer, PickleSerializer from pyflink.table.catalog import Catalog from pyflink.table.table_config import TableConfig @@ -730,32 +732,58 @@ class StreamTableEnvironment(TableEnvironment): self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) @staticmethod - def create(stream_execution_environment, table_config=None): + def create(stream_execution_environment, table_config=None, environment_settings=None): """ - Creates a :class:`TableEnvironment` for a :class:`StreamExecutionEnvironment` + Creates a :class:`TableEnvironment` for a + :class:`~pyflink.datastream.StreamExecutionEnvironment`. Example: :: >>> env = StreamExecutionEnvironment.get_execution_environment() - # create without TableConfig + # create without optional parameters. >>> table_env = StreamTableEnvironment.create(env) # create with TableConfig >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = StreamTableEnvironment.create(env, table_config) + # create with EnvrionmentSettings + >>> environment_settings = EnvironmentSettings.new_instance().use_blink_planner() \\ + ... .build() + >>> table_env = StreamTableEnvironment.create( + ... env, environment_settings=environment_settings) + - :param stream_execution_environment: The :class:`StreamExecutionEnvironment` of the - TableEnvironment. + :param stream_execution_environment: The + :class:`~pyflink.datastream.StreamExecutionEnvironment` + of the TableEnvironment. + :type stream_execution_environment: pyflink.datastream.StreamExecutionEnvironment :param table_config: The configuration of the TableEnvironment, optional. + :type table_config: TableConfig + :param environment_settings: The environment settings used to instantiate the + TableEnvironment. It provides the interfaces about planner + selection(flink or blink), optional. + :type environment_settings: pyflink.table.EnvironmentSettings :return: The :class:`StreamTableEnvironment` created from given StreamExecutionEnvironment and configuration. + :rtype: StreamTableEnvironment """ + if table_config is not None and environment_settings is not None: + raise ValueError("The param 'table_config' and " + "'environment_settings' cannot be used at the same time") + gateway = get_gateway() if table_config is not None: j_tenv = gateway.jvm.StreamTableEnvironment.create( stream_execution_environment._j_stream_execution_environment, table_config._j_table_config) + elif environment_settings is not None: + if not environment_settings.is_streaming_mode(): + raise ValueError("The environment settings for StreamTableEnvironment must be " + "set to streaming mode.") + j_tenv = gateway.jvm.StreamTableEnvironment.create( + stream_execution_environment._j_stream_execution_environment, + environment_settings._j_environment_settings) else: j_tenv = gateway.jvm.StreamTableEnvironment.create( stream_execution_environment._j_stream_execution_environment) @@ -770,10 +798,16 @@ class BatchTableEnvironment(TableEnvironment): def _from_file(self, filename, schema): gateway = get_gateway() - jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile( - self._j_tenv.execEnv(), filename, True) - return Table(gateway.jvm.PythonTableUtils.fromDataSet( - self._j_tenv, jds, _to_java_type(schema))) + blink_t_env_class = get_java_class( + gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl) + if blink_t_env_class == self._j_tenv.getClass(): + raise NotImplementedError("The operation 'from_elements' in batch mode is currently " + "not supported when using blink planner.") + else: + jds = gateway.jvm.PythonBridgeUtils.createDataSetFromFile( + self._j_tenv.execEnv(), filename, True) + return Table(gateway.jvm.PythonTableUtils.fromDataSet( + self._j_tenv, jds, _to_java_type(schema))) def get_config(self): """ @@ -810,38 +844,89 @@ class BatchTableEnvironment(TableEnvironment): ... .register_table_source("MyTable") :param connector_descriptor: Connector descriptor describing the external system. - :return: A :class:`BatchTableDescriptor` used to build the table source/sink. + :type connector_descriptor: ConnectorDescriptor + :return: A :class:`BatchTableDescriptor` or a :class:`StreamTableDescriptor` + (for blink planner) used to build the table source/sink. + :rtype: BatchTableDescriptor or StreamTableDescriptor """ - # type: (ConnectorDescriptor) -> BatchTableDescriptor - return BatchTableDescriptor( - self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) + gateway = get_gateway() + blink_t_env_class = get_java_class( + gateway.jvm.org.apache.flink.table.api.internal.TableEnvironmentImpl) + if blink_t_env_class == self._j_tenv.getClass(): + return StreamTableDescriptor( + self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) + else: + return BatchTableDescriptor( + self._j_tenv.connect(connector_descriptor._j_connector_descriptor)) @staticmethod - def create(execution_environment, table_config=None): + def create(execution_environment=None, table_config=None, environment_settings=None): """ - Creates a :class:`TableEnvironment` for a batch :class:`ExecutionEnvironment`. + Creates a :class:`BatchTableEnvironment`. Example: :: + # create with ExecutionEnvironment. >>> env = ExecutionEnvironment.get_execution_environment() >>> table_env = BatchTableEnvironment.create(env) + # create with ExecutionEnvironment and TableConfig. >>> table_config = TableConfig() >>> table_config.set_null_check(False) >>> table_env = BatchTableEnvironment.create(env, table_config) - - :param execution_environment: The batch :class:`ExecutionEnvironment` of the - TableEnvironment. + # create with EnvironmentSettings. + >>> environment_settings = EnvironmentSettings.new_instance().in_batch_mode() \\ + ... .use_blink_planner().build() + >>> table_env = BatchTableEnvironment.create(environment_settings=environment_settings) + + :param execution_environment: The batch :class:`pyflink.dataset.ExecutionEnvironment` of + the TableEnvironment. + :type execution_environment: pyflink.dataset.ExecutionEnvironment :param table_config: The configuration of the TableEnvironment, optional. - :return: The :class:`BatchTableEnvironment` created from given ExecutionEnvironment and + :type table_config: TableConfig + :param environment_settings: The environment settings used to instantiate the + TableEnvironment. It provides the interfaces about planner + selection(flink or blink), optional. + :type environment_settings: pyflink.table.EnvironmentSettings + :return: The BatchTableEnvironment created from given ExecutionEnvironment and configuration. - """ + :rtype: BatchTableEnvironment + """ + if execution_environment is None and \ + table_config is None and \ + environment_settings is None: + raise ValueError("No argument found, the param 'execution_environment' " + "or 'environment_settings' is required.") + elif execution_environment is None and \ + table_config is not None and \ + environment_settings is None: + raise ValueError("Only the param 'table_config' is found, " + "the param 'execution_environment' is also required.") + elif execution_environment is not None and \ + environment_settings is not None: + raise ValueError("The param 'execution_environment' and " + "'environment_settings' cannot be used at the same time") + elif table_config is not None and \ + environment_settings is not None: + raise ValueError("The param 'table_config' and " + "'environment_settings' cannot be used at the same time") + gateway = get_gateway() - if table_config is not None: - j_tenv = gateway.jvm.BatchTableEnvironment.create( - execution_environment._j_execution_environment, - table_config._j_table_config) - else: - j_tenv = gateway.jvm.BatchTableEnvironment.create( - execution_environment._j_execution_environment) - return BatchTableEnvironment(j_tenv) + if execution_environment is not None and environment_settings is None: + if table_config is not None: + j_tenv = gateway.jvm.BatchTableEnvironment.create( + execution_environment._j_execution_environment, + table_config._j_table_config) + else: + j_tenv = gateway.jvm.BatchTableEnvironment.create( + execution_environment._j_execution_environment) + return BatchTableEnvironment(j_tenv) + elif environment_settings is not None and \ + execution_environment is None and \ + table_config is None: + if environment_settings.is_streaming_mode(): + raise ValueError("The environment settings for BatchTableEnvironment must be " + "set to batch mode.") + j_tenv = gateway.jvm.TableEnvironment.create( + environment_settings._j_environment_settings) + return BatchTableEnvironment(j_tenv) diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py b/flink-python/pyflink/table/tests/test_environment_settings.py new file mode 100644 index 0000000..e493107 --- /dev/null +++ b/flink-python/pyflink/table/tests/test_environment_settings.py @@ -0,0 +1,133 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.java_gateway import get_gateway + +from pyflink.table import EnvironmentSettings +from pyflink.testing.test_case_utils import PyFlinkTestCase, get_private_field + + +class EnvironmentSettingsTests(PyFlinkTestCase): + + def test_planner_selection(self): + + gateway = get_gateway() + + CLASS_NAME = gateway.jvm.EnvironmentSettings.CLASS_NAME + + builder = EnvironmentSettings.new_instance() + + OLD_PLANNER_FACTORY = get_private_field(builder._j_builder, "OLD_PLANNER_FACTORY") + OLD_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "OLD_EXECUTOR_FACTORY") + BLINK_PLANNER_FACTORY = get_private_field(builder._j_builder, "BLINK_PLANNER_FACTORY") + BLINK_EXECUTOR_FACTORY = get_private_field(builder._j_builder, "BLINK_EXECUTOR_FACTORY") + + # test the default behaviour to make sure it is consistent with the python doc + envrionment_settings = builder.build() + + self.assertEqual( + envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME], + OLD_PLANNER_FACTORY) + + self.assertEqual( + envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME], + OLD_EXECUTOR_FACTORY) + + # test use_old_planner + envrionment_settings = builder.use_old_planner().build() + + self.assertEqual( + envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME], + OLD_PLANNER_FACTORY) + + self.assertEqual( + envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME], + OLD_EXECUTOR_FACTORY) + + # test use_blink_planner + envrionment_settings = builder.use_blink_planner().build() + + self.assertEqual( + envrionment_settings._j_environment_settings.toPlannerProperties()[CLASS_NAME], + BLINK_PLANNER_FACTORY) + + self.assertEqual( + envrionment_settings._j_environment_settings.toExecutorProperties()[CLASS_NAME], + BLINK_EXECUTOR_FACTORY) + + # test use_any_planner + envrionment_settings = builder.use_any_planner().build() + + self.assertTrue( + CLASS_NAME not in envrionment_settings._j_environment_settings.toPlannerProperties()) + + self.assertTrue( + CLASS_NAME not in envrionment_settings._j_environment_settings.toExecutorProperties()) + + def test_mode_selection(self): + + builder = EnvironmentSettings.new_instance() + + # test the default behaviour to make sure it is consistent with the python doc + envrionment_settings = builder.build() + + self.assertTrue(envrionment_settings.is_streaming_mode()) + + # test in_streaming_mode + envrionment_settings = builder.in_streaming_mode().build() + + self.assertTrue(envrionment_settings.is_streaming_mode()) + + # test in_batch_mode + envrionment_settings = builder.in_batch_mode().build() + + self.assertFalse(envrionment_settings.is_streaming_mode()) + + def test_with_built_in_catalog_name(self): + + gateway = get_gateway() + + DEFAULT_BUILTIN_CATALOG = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG + + builder = EnvironmentSettings.new_instance() + + # test the default behaviour to make sure it is consistent with the python doc + envrionment_settings = builder.build() + + self.assertEqual(envrionment_settings.get_built_in_catalog_name(), DEFAULT_BUILTIN_CATALOG) + + envrionment_settings = builder.with_built_in_catalog_name("my_catalog").build() + + self.assertEqual(envrionment_settings.get_built_in_catalog_name(), "my_catalog") + + def test_with_built_in_database_name(self): + + gateway = get_gateway() + + DEFAULT_BUILTIN_DATABASE = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE + + builder = EnvironmentSettings.new_instance() + + # test the default behaviour to make sure it is consistent with the python doc + envrionment_settings = builder.build() + + self.assertEqual(envrionment_settings.get_built_in_database_name(), + DEFAULT_BUILTIN_DATABASE) + + envrionment_settings = builder.with_built_in_database_name("my_database").build() + + self.assertEqual(envrionment_settings.get_built_in_database_name(), "my_database") diff --git a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py new file mode 100644 index 0000000..f32e813 --- /dev/null +++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py @@ -0,0 +1,67 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import unittest + +from pyflink.table import EnvironmentSettings +from pyflink.testing.test_case_utils import PythonAPICompletenessTestCase + + +class EnvironmentSettingsCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase): + """ + Tests whether the Python :class:`EnvironmentSettings` is consistent with + Java `org.apache.flink.table.api.EnvironmentSettings`. + """ + + @classmethod + def python_class(cls): + return EnvironmentSettings + + @classmethod + def java_class(cls): + return "org.apache.flink.table.api.EnvironmentSettings" + + @classmethod + def excluded_methods(cls): + # internal interfaces, no need to expose to users. + return {'toPlannerProperties', 'toExecutorProperties'} + + +class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, unittest.TestCase): + """ + Tests whether the Python :class:`EnvironmentSettings.Builder` is consistent with + Java `org.apache.flink.table.api.EnvironmentSettings$Builder`. + """ + + @classmethod + def python_class(cls): + return EnvironmentSettings.Builder + + @classmethod + def java_class(cls): + return "org.apache.flink.table.api.EnvironmentSettings$Builder" + + +if __name__ == '__main__': + import unittest + + try: + import xmlrunner + testRunner = xmlrunner.XMLTestRunner(output='target/test-reports') + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index 3e2dd6f..8bbd491 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -21,7 +21,7 @@ from py4j.compat import unicode from pyflink.dataset import ExecutionEnvironment from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment +from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment, EnvironmentSettings from pyflink.table.table_config import TableConfig from pyflink.table.table_environment import BatchTableEnvironment from pyflink.table.types import RowType @@ -197,6 +197,49 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000) self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai") + def test_create_table_environment_with_blink_planner(self): + t_env = StreamTableEnvironment.create( + self.env, + environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) + + planner = t_env._j_tenv.getPlanner() + + self.assertEqual( + planner.getClass().getName(), + "org.apache.flink.table.planner.delegation.StreamPlanner") + + def test_table_environment_with_blink_planner(self): + t_env = StreamTableEnvironment.create( + self.env, + environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()) + + source_path = os.path.join(self.tempdir + '/streaming.csv') + sink_path = os.path.join(self.tempdir + '/result.csv') + field_names = ["a", "b", "c"] + field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()] + data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')] + csv_source = self.prepare_csv_source(source_path, data, field_types, field_names) + + t_env.register_table_source("source", csv_source) + + t_env.register_table_sink( + "sink", + CsvTableSink(field_names, field_types, sink_path)) + source = t_env.scan("source") + + result = source.alias("a, b, c").select("1 + a, b, c") + + result.insert_into("sink") + + t_env.execute("blink_test") + + results = [] + with open(sink_path, 'r') as f: + results.append(f.readline()) + results.append(f.readline()) + + self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) + class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): @@ -273,3 +316,50 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): self.assertFalse(readed_table_config.get_null_check()) self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000) self.assertEqual(readed_table_config.get_local_timezone(), "Asia/Shanghai") + + def test_create_table_environment_with_blink_planner(self): + t_env = BatchTableEnvironment.create( + environment_settings=EnvironmentSettings.new_instance().in_batch_mode() + .use_blink_planner().build()) + + planner = t_env._j_tenv.getPlanner() + + self.assertEqual( + planner.getClass().getName(), + "org.apache.flink.table.planner.delegation.BatchPlanner") + + def test_table_environment_with_blink_planner(self): + t_env = BatchTableEnvironment.create( + environment_settings=EnvironmentSettings.new_instance().in_batch_mode() + .use_blink_planner().build()) + + source_path = os.path.join(self.tempdir + '/streaming.csv') + sink_path = os.path.join(self.tempdir + '/results') + field_names = ["a", "b", "c"] + field_types = [DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING()] + data = [(1, 'hi', 'hello'), (2, 'hello', 'hello')] + csv_source = self.prepare_csv_source(source_path, data, field_types, field_names) + + t_env.register_table_source("source", csv_source) + + t_env.register_table_sink( + "sink", + CsvTableSink(field_names, field_types, sink_path)) + source = t_env.scan("source") + + result = source.alias("a, b, c").select("1 + a, b, c") + + result.insert_into("sink") + + t_env.execute("blink_test") + + results = [] + for root, dirs, files in os.walk(sink_path): + for sub_file in files: + with open(os.path.join(root, sub_file), 'r') as f: + line = f.readline() + while line is not None and line != '': + results.append(line) + line = f.readline() + + self.assert_equals(results, ['2,hi,hello\n', '3,hello,hello\n']) diff --git a/flink-python/pyflink/testing/test_case_utils.py b/flink-python/pyflink/testing/test_case_utils.py index cda1e65..c1d484e 100644 --- a/flink-python/pyflink/testing/test_case_utils.py +++ b/flink-python/pyflink/testing/test_case_utils.py @@ -25,6 +25,8 @@ import unittest from abc import abstractmethod from py4j.java_gateway import JavaObject +from py4j.protocol import Py4JJavaError + from pyflink.table.sources import CsvTableSource from pyflink.dataset import ExecutionEnvironment @@ -48,6 +50,23 @@ logging.basicConfig(stream=sys.stdout, level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") +def get_private_field(java_obj, field_name): + try: + field = java_obj.getClass().getDeclaredField(field_name) + field.setAccessible(True) + return field.get(java_obj) + except Py4JJavaError: + cls = java_obj.getClass() + while cls.getSuperclass() is not None: + cls = cls.getSuperclass() + try: + field = cls.getDeclaredField(field_name) + field.setAccessible(True) + return field.get(java_obj) + except Py4JJavaError: + pass + + class PyFlinkTestCase(unittest.TestCase): """ Base class for unit tests.