This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95944e231315f085d5e23717332aa2866caa5d8a Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Wed Jun 19 15:00:23 2019 +0200 [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor This closes #8852. --- flink-python/pyflink/table/table_config.py | 36 ---- .../table/tests/test_table_environment_api.py | 12 -- .../client/gateway/local/ExecutionContextTest.java | 3 +- .../client/gateway/local/LocalExecutorITCase.java | 7 +- .../table/api/java/BatchTableEnvironment.java | 5 +- .../table/api/java/StreamTableEnvironment.java | 155 +++++++++----- .../java/internal/StreamTableEnvironmentImpl.java | 66 +++--- .../flink/table/api/EnvironmentSettings.java | 228 +++++++++++++++++++++ .../org/apache/flink/table/api/TableConfig.java | 44 ---- .../apache/flink/table/api/TableEnvironment.java | 22 +- .../table/api/internal/TableEnvironmentImpl.java | 4 +- .../flink/table/delegation/ExecutorFactory.java | 50 +++++ .../internal => delegation}/PlannerFactory.java | 49 ++--- .../flink/table/factories/ComponentFactory.java | 53 +++++ .../table/factories/ComponentFactoryService.java | 80 ++++++++ .../factories/ComponentFactoryServiceTest.java | 68 ++++++ .../factories/utils/OtherTestPlannerFactory.java | 28 +++ .../table/factories/utils/TestPlannerFactory.java | 69 +++++++ .../org.apache.flink.table.factories.TableFactory | 6 +- .../table/api/scala/BatchTableEnvironment.scala | 7 +- .../table/api/scala/StreamTableEnvironment.scala | 132 ++++++++---- .../internal/StreamTableEnvironmentImpl.scala | 90 ++++---- .../flink/table/executor/StreamExecutor.java | 4 +- ...utorFactory.java => StreamExecutorFactory.java} | 49 +++-- .../flink/table/planner/StreamPlannerFactory.java | 70 +++++++ .../org.apache.flink.table.factories.TableFactory | 2 + .../flink/table/api/internal/TableEnvImpl.scala | 4 +- .../api/stream/StreamTableEnvironmentTest.scala | 26 ++- .../apache/flink/table/utils/TableTestBase.scala | 76 +++---- 29 files changed, 1052 insertions(+), 393 deletions(-) diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py index 7eb3513..d6b5864 100644 --- a/flink-python/pyflink/table/table_config.py +++ b/flink-python/pyflink/table/table_config.py @@ -90,39 +90,3 @@ class TableConfig(object): self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length) else: raise Exception("TableConfig.max_generated_code_length should be a int value!") - - def get_built_in_catalog_name(self): - """ - Gets the specified name of the initial catalog to be created when instantiating - :class:`TableEnvironment`. - """ - return self._j_table_config.getBuiltInCatalogName() - - def set_built_in_catalog_name(self, built_in_catalog_name): - """ - Specifies the name of the initial catalog to be created when instantiating - :class:`TableEnvironment`. This method has no effect if called on the - :func:`~pyflink.table.TableEnvironment.get_config`. - """ - if built_in_catalog_name is not None and isinstance(built_in_catalog_name, str): - self._j_table_config.setBuiltInCatalogName(built_in_catalog_name) - else: - raise Exception("TableConfig.built_in_catalog_name should be a string value!") - - def get_built_in_database_name(self): - """ - Gets the specified name of the default database in the initial catalog to be created when - instantiating :class:`TableEnvironment`. - """ - return self._j_table_config.getBuiltInDatabaseName() - - def set_built_in_database_name(self, built_in_database_name): - """ - Specifies the name of the default database in the initial catalog to be created when - instantiating :class:`TableEnvironment`. This method has no effect if called on the - :func:`~pyflink.table.TableEnvironment.get_config`. - """ - if built_in_database_name is not None and isinstance(built_in_database_name, str): - self._j_table_config.setBuiltInDatabaseName(built_in_database_name) - else: - raise Exception("TableConfig.built_in_database_name should be a string value!") diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py index a129d22..4eba1bb 100644 --- a/flink-python/pyflink/table/tests/test_table_environment_api.py +++ b/flink-python/pyflink/table/tests/test_table_environment_api.py @@ -170,8 +170,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): table_config.set_max_generated_code_length(32000) table_config.set_null_check(False) table_config.set_timezone("Asia/Shanghai") - table_config.set_built_in_catalog_name("test_catalog") - table_config.set_built_in_database_name("test_database") env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env, table_config) @@ -181,8 +179,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase): self.assertFalse(readed_table_config.get_null_check()) self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000) self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai") - self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog") - self.assertEqual(table_config.get_built_in_database_name(), "test_database") class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): @@ -208,22 +204,16 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): table_config.set_timezone("Asia/Shanghai") table_config.set_max_generated_code_length(64000) table_config.set_null_check(True) - table_config.set_built_in_catalog_name("test_catalog") - table_config.set_built_in_database_name("test_database") self.assertTrue(table_config.get_null_check()) self.assertEqual(table_config.get_max_generated_code_length(), 64000) self.assertEqual(table_config.get_timezone(), "Asia/Shanghai") - self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog") - self.assertEqual(table_config.get_built_in_database_name(), "test_database") def test_create_table_environment(self): table_config = TableConfig() table_config.set_max_generated_code_length(32000) table_config.set_null_check(False) table_config.set_timezone("Asia/Shanghai") - table_config.set_built_in_catalog_name("test_catalog") - table_config.set_built_in_database_name("test_database") env = ExecutionEnvironment.get_execution_environment() t_env = BatchTableEnvironment.create(env, table_config) @@ -233,5 +223,3 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase): self.assertFalse(readed_table_config.get_null_check()) self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000) self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai") - self.assertEqual(readed_table_config.get_built_in_catalog_name(), "test_catalog") - self.assertEqual(readed_table_config.get_built_in_database_name(), "test_database") diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index ac60fcc..fb0c80c 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; @@ -110,7 +109,7 @@ public class ExecutionContextTest { assertEquals( new HashSet<>( Arrays.asList( - TableConfig.getDefault().getBuiltInCatalogName(), + "default_catalog", inmemoryCatalog, hiveCatalog, hiveDefaultVersionCatalog, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index 4f92ae7..023d656 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -31,7 +31,6 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.ViewEntry; @@ -76,7 +75,6 @@ import static org.junit.Assert.fail; public class LocalExecutorITCase extends TestLogger { private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml"; - private static final int NUM_TMS = 2; private static final int NUM_SLOTS_PER_TM = 2; @@ -158,7 +156,7 @@ public class LocalExecutorITCase extends TestLogger { final List<String> actualCatalogs = executor.listCatalogs(session); final List<String> expectedCatalogs = Arrays.asList( - TableConfig.getDefault().getBuiltInCatalogName(), + "default_catalog", "catalog1"); assertEquals(expectedCatalogs, actualCatalogs); } @@ -170,8 +168,7 @@ public class LocalExecutorITCase extends TestLogger { final List<String> actualDatabases = executor.listDatabases(session); - final List<String> expectedDatabases = Arrays.asList( - TableConfig.getDefault().getBuiltInDatabaseName()); + final List<String> expectedDatabases = Arrays.asList("default_database"); assertEquals(expectedDatabases, actualDatabases); } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java index 1759b60..16f63af 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java @@ -287,9 +287,10 @@ public interface BatchTableEnvironment extends TableEnvironment { try { Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl"); Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class); + String defaultCatalog = "default_catalog"; CatalogManager catalogManager = new CatalogManager( - tableConfig.getBuiltInCatalogName(), - new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName()) + defaultCatalog, + new GenericInMemoryCatalog(defaultCatalog, "default_database") ); return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager); } catch (Throwable t) { diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java index a4f5df2..6859780 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; @@ -35,24 +36,115 @@ import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; /** - * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with - * {@link DataStream}s. + * This table environment is the entry point and central context for creating Table & SQL + * API programs that integrate with the Java-specific {@link DataStream} API. * - * <p>A TableEnvironment can be used to: + * <p>It is unified for bounded and unbounded data processing. + * + * <p>A stream table environment is responsible for: * <ul> - * <li>convert a {@link DataStream} to a {@link Table}</li> - * <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li> - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li> - * <li>scan a registered table to obtain a {@link Table}</li> - * <li>specify a SQL query on registered tables to obtain a {@link Table}</li> - * <li>convert a {@link Table} into a {@link DataStream}</li> - * <li>explain the AST and execution plan of a {@link Table}</li> + * <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li> + * <li>Connecting to external systems.</li> + * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li> + * <li>Executing SQL statements.</li> + * <li>Offering further configuration options.</li> * </ul> + * + * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant + * for pure table programs. */ @PublicEvolving public interface StreamTableEnvironment extends TableEnvironment { /** + * Creates a table environment that is the entry point and central context for creating Table & SQL + * API programs that integrate with the Java-specific {@link DataStream} API. + * + * <p>It is unified for bounded and unbounded data processing. + * + * <p>A stream table environment is responsible for: + * <ul> + * <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li> + * <li>Connecting to external systems.</li> + * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li> + * <li>Executing SQL statements.</li> + * <li>Offering further configuration options.</li> + * </ul> + * + * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant + * for pure table programs. + * + * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}. + */ + static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { + return create( + executionEnvironment, + EnvironmentSettings.newInstance().build()); + } + + /** + * Creates a table environment that is the entry point and central context for creating Table & SQL + * API programs that integrate with the Java-specific {@link DataStream} API. + * + * <p>It is unified for bounded and unbounded data processing. + * + * <p>A stream table environment is responsible for: + * <ul> + * <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li> + * <li>Connecting to external systems.</li> + * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li> + * <li>Executing SQL statements.</li> + * <li>Offering further configuration options.</li> + * </ul> + * + * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant + * for pure table programs. + * + * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}. + * @param settings The environment settings used to instantiate the {@link TableEnvironment}. + */ + static StreamTableEnvironment create( + StreamExecutionEnvironment executionEnvironment, + EnvironmentSettings settings) { + return StreamTableEnvironmentImpl.create( + executionEnvironment, + settings, + new TableConfig() + ); + } + + /** + * Creates a table environment that is the entry point and central context for creating Table & SQL + * API programs that integrate with the Java-specific {@link DataStream} API. + * + * <p>It is unified for bounded and unbounded data processing. + * + * <p>A stream table environment is responsible for: + * <ul> + * <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li> + * <li>Connecting to external systems.</li> + * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li> + * <li>Executing SQL statements.</li> + * <li>Offering further configuration options.</li> + * </ul> + * + * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant + * for pure table programs. + * + * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}. + * @param tableConfig The configuration of the {@link TableEnvironment}. + * @deprecated Use {@link #create(StreamExecutionEnvironment)} and {@link #getConfig()} + * for manipulating {@link TableConfig}. + */ + @Deprecated + static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) { + return StreamTableEnvironmentImpl.create( + executionEnvironment, + EnvironmentSettings.newInstance().build(), + tableConfig); + } + + /** * Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog. * Registered functions can be referenced in Table API and SQL queries. * @@ -356,47 +448,4 @@ public interface StreamTableEnvironment extends TableEnvironment { */ @Override StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor); - - /** - * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with - * {@link DataStream}s. - * - * <p>A TableEnvironment can be used to: - * <ul> - * <li>convert a {@link DataStream} to a {@link Table}</li> - * <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li> - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li> - * <li>scan a registered table to obtain a {@link Table}</li> - * <li>specify a SQL query on registered tables to obtain a {@link Table}</li> - * <li>convert a {@link Table} into a {@link DataStream}</li> - * <li>explain the AST and execution plan of a {@link Table}</li> - * </ul> - * - * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment. - */ - static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { - return create(executionEnvironment, new TableConfig()); - } - - /** - * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with - * {@link DataStream}s. - * - * <p>A TableEnvironment can be used to: - * <ul> - * <li>convert a {@link DataStream} to a {@link Table}</li> - * <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li> - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li> - * <li>scan a registered table to obtain a {@link Table}</li> - * <li>specify a SQL query on registered tables to obtain a {@link Table}</li> - * <li>convert a {@link Table} into a {@link DataStream}</li> - * <li>explain the AST and execution plan of a {@link Table}</li> - * </ul> - * - * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment. - * @param tableConfig The configuration of the TableEnvironment. - */ - static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) { - return StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment); - } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java index 0ea02a3..6b37690 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.table.api.java.internal; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -27,24 +28,27 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.StreamQueryConfig; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.internal.PlannerFactory; import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.table.descriptors.StreamTableDescriptor; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionParser; +import org.apache.flink.table.factories.ComponentFactoryService; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; @@ -60,6 +64,7 @@ import org.apache.flink.table.typeutils.FieldInfoUtils; import java.lang.reflect.Method; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -73,7 +78,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple private final StreamExecutionEnvironment executionEnvironment; - private StreamTableEnvironmentImpl( + @VisibleForTesting + public StreamTableEnvironmentImpl( CatalogManager catalogManager, FunctionCatalog functionCatalog, TableConfig tableConfig, @@ -92,33 +98,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple * @param tableConfig The configuration of the TableEnvironment. * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment. */ - public static StreamTableEnvironmentImpl create( - TableConfig tableConfig, - StreamExecutionEnvironment executionEnvironment) { - CatalogManager catalogManager = new CatalogManager( - tableConfig.getBuiltInCatalogName(), - new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName())); - return create(catalogManager, tableConfig, executionEnvironment); - } - - /** - * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for - * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting - * from/to {@link DataStream}. - * - * @param catalogManager The {@link CatalogManager} to use for storing and looking up {@link Table}s. - * @param tableConfig The configuration of the TableEnvironment. - * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment. - */ - public static StreamTableEnvironmentImpl create( - CatalogManager catalogManager, - TableConfig tableConfig, - StreamExecutionEnvironment executionEnvironment) { + public static StreamTableEnvironment create( + StreamExecutionEnvironment executionEnvironment, + EnvironmentSettings settings, + TableConfig tableConfig) { FunctionCatalog functionCatalog = new FunctionCatalog( - catalogManager.getCurrentCatalog(), - catalogManager.getCurrentDatabase()); - Executor executor = lookupExecutor(executionEnvironment); - Planner planner = PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager); + settings.getBuiltInCatalogName(), + settings.getBuiltInDatabaseName()); + CatalogManager catalogManager = new CatalogManager( + settings.getBuiltInCatalogName(), + new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())); + Map<String, String> plannerProperties = settings.toPlannerProperties(); + Map<String, String> executorProperties = settings.toExecutorProperties(); + Executor executor = lookupExecutor(executorProperties, executionEnvironment); + Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties) + .create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager); return new StreamTableEnvironmentImpl( catalogManager, functionCatalog, @@ -129,12 +123,18 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple ); } - private static Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) { + private static Executor lookupExecutor( + Map<String, String> executorProperties, + StreamExecutionEnvironment executionEnvironment) { try { - Class<?> clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory"); - Method createMethod = clazz.getMethod("create", StreamExecutionEnvironment.class); - - return (Executor) createMethod.invoke(null, executionEnvironment); + ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties); + Method createMethod = executorFactory.getClass() + .getMethod("create", Map.class, StreamExecutionEnvironment.class); + + return (Executor) createMethod.invoke( + executorFactory, + executorProperties, + executionEnvironment); } catch (Exception e) { throw new TableException( "Could not instantiate the executor. Make sure a planner module is on the classpath", diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java new file mode 100644 index 0000000..37ba179 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.Planner; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +/** + * Defines all parameters that initialize a table environment. Those parameters are used only + * during instantiation of a {@link TableEnvironment} and cannot be changed afterwards. + * + * <p>Example: + * <pre>{@code + * EnvironmentSettings.newInstance() + * .useOldPlanner() + * .inStreamingMode() + * .withBuiltInCatalogName("default_catalog") + * .withBuiltInDatabaseName("default_database") + * .build() + * }</pre> + */ +@PublicEvolving +public class EnvironmentSettings { + public static final String BATCH_MODE = "batch-mode"; + public static final String CLASS_NAME = "class-name"; + + /** + * Canonical name of the {@link Planner} class to use. + */ + private final String plannerClass; + + /** + * Canonical name of the {@link Executor} class to use. + */ + private final String executorClass; + + /** + * Specifies the name of the initial catalog to be created when instantiating + * {@link TableEnvironment}. + */ + private final String builtInCatalogName; + + /** + * Specifies the name of the default database in the initial catalog to be created when + * instantiating {@link TableEnvironment}. + */ + private final String builtInDatabaseName; + + /** + * Determines if the table environment should work in a batch ({@code true}) or + * streaming ({@code false}) mode. + */ + private final boolean isBatchMode; + + private EnvironmentSettings( + @Nullable String plannerClass, + @Nullable String executorClass, + String builtInCatalogName, + String builtInDatabaseName, + boolean isBatchMode) { + this.plannerClass = plannerClass; + this.executorClass = executorClass; + this.builtInCatalogName = builtInCatalogName; + this.builtInDatabaseName = builtInDatabaseName; + this.isBatchMode = isBatchMode; + } + + /** + * Creates a builder for creating an instance of {@link EnvironmentSettings}. + * + * <p>By default, it does not specify a required planner and will use the one that is available + * on the classpath via discovery. + */ + public static Builder newInstance() { + return new Builder(); + } + + /** + * Gets the specified name of the initial catalog to be created when instantiating + * a {@link TableEnvironment}. + */ + public String getBuiltInCatalogName() { + return builtInCatalogName; + } + + /** + * Gets the specified name of the default database in the initial catalog to be created when instantiating + * a {@link TableEnvironment}. + */ + public String getBuiltInDatabaseName() { + return builtInDatabaseName; + } + + @Internal + public Map<String, String> toPlannerProperties() { + Map<String, String> properties = new HashMap<>(toCommonProperties()); + if (plannerClass != null) { + properties.put(CLASS_NAME, plannerClass); + } + return properties; + } + + @Internal + public Map<String, String> toExecutorProperties() { + Map<String, String> properties = new HashMap<>(toCommonProperties()); + if (executorClass != null) { + properties.put(CLASS_NAME, executorClass); + } + return properties; + } + + private Map<String, String> toCommonProperties() { + Map<String, String> properties = new HashMap<>(); + properties.put(BATCH_MODE, Boolean.toString(isBatchMode)); + return properties; + } + + /** + * A builder for {@link EnvironmentSettings}. + */ + public static class Builder { + private String plannerClass = null; + private String executorClass = null; + private String builtInCatalogName = "default_catalog"; + private String builtInDatabaseName = "default_database"; + private boolean isBatchMode = false; + + /** + * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is + * enabled. + */ + public Builder useOldPlanner() { + this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory"; + this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory"; + return this; + } + + /** + * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is + * enabled. + */ + public Builder useBlinkPlanner() { + throw new UnsupportedOperationException("The Blink planner is not supported yet."); + } + + /** + * Does not set a planner requirement explicitly. + * + * <p>A planner will be discovered automatically, if there is only one planner available. + * + * <p>This is the default behavior. + */ + public Builder useAnyPlanner() { + this.plannerClass = null; + this.executorClass = null; + return this; + } + + /** + * Sets that the components should work in a batch mode. Streaming mode by default. + */ + public Builder inBatchMode() { + this.isBatchMode = true; + return this; + } + + /** + * Sets that the components should work in a streaming mode. Enabled by default. + */ + public Builder inStreamingMode() { + this.isBatchMode = false; + return this; + } + + /** + * Specifies the name of the initial catalog to be created when instantiating + * a {@link TableEnvironment}. Default: "default_catalog". + */ + public Builder withBuiltInCatalogName(String builtInCatalogName) { + this.builtInCatalogName = builtInCatalogName; + return this; + } + + /** + * Specifies the name of the default database in the initial catalog to be created when instantiating + * a {@link TableEnvironment}. Default: "default_database". + */ + public Builder withBuiltInDatabaseName(String builtInDatabaseName) { + this.builtInDatabaseName = builtInDatabaseName; + return this; + } + + /** + * Returns an immutable instance of {@link EnvironmentSettings}. + */ + public EnvironmentSettings build() { + return new EnvironmentSettings( + plannerClass, + executorClass, + builtInCatalogName, + builtInDatabaseName, + isBatchMode); + } + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index 8e5ba8a..325732f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -59,18 +59,6 @@ public class TableConfig { private Integer maxGeneratedCodeLength = 64000; // just an estimate /** - * Specifies the name of the initial catalog to be created when instantiating - * TableEnvironment. - */ - private String builtInCatalogName = "default_catalog"; - - /** - * Specifies the name of the default database in the initial catalog to be created when instantiating - * TableEnvironment. - */ - private String builtInDatabaseName = "default_database"; - - /** * Returns the timezone for date/time/timestamp conversions. */ public TimeZone getTimeZone() { @@ -147,38 +135,6 @@ public class TableConfig { this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength); } - /** - * Gets the specified name of the initial catalog to be created when instantiating - * a {@link TableEnvironment}. - */ - public String getBuiltInCatalogName() { - return builtInCatalogName; - } - - /** - * Specifies the name of the initial catalog to be created when instantiating - * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}. - */ - public void setBuiltInCatalogName(String builtInCatalogName) { - this.builtInCatalogName = builtInCatalogName; - } - - /** - * Gets the specified name of the default database in the initial catalog to be created when instantiating - * a {@link TableEnvironment}. - */ - public String getBuiltInDatabaseName() { - return builtInDatabaseName; - } - - /** - * Specifies the name of the default database in the initial catalog to be created when instantiating - * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}. - */ - public void setBuiltInDatabaseName(String builtInDatabaseName) { - this.builtInDatabaseName = builtInDatabaseName; - } - public static TableConfig getDefault() { return new TableConfig(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java index b2b50a0..85d20f6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java @@ -31,21 +31,23 @@ import org.apache.flink.table.sources.TableSource; import java.util.Optional; /** - * The base class for batch and stream TableEnvironments. + * A table environment is the base class, entry point, and central context for creating Table & SQL + * API programs. * - * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is - * responsible for: + * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction + * between Scala and Java API) and for bounded and unbounded data processing. * + * <p>A table environment is responsible for: * <ul> - * <li>Registering a Table in the internal catalog</li> - * <li>Registering an external catalog</li> - * <li>Executing SQL queries</li> - * <li>Registering a user-defined scalar function. For the user-defined table and aggregate - * function, use the StreamTableEnvironment or BatchTableEnvironment</li> + * <li>Connecting to external systems.</li> + * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li> + * <li>Executing SQL statements.</li> + * <li>Offering further configuration options.</li> * </ul> * - * <p>This environment is unified both on a language level (for all JVM-based languages, i.e. no distinction between - * Scala and Java API) and for bounded and unbounded data processing. + * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to + * other Flink APIs, it might be necessary to use one of the available language-specific table environments + * in the corresponding bridging modules. */ @PublicEvolving public interface TableEnvironment { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 40849aa..727727a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -98,8 +98,8 @@ public class TableEnvironmentImpl implements TableEnvironment { this.tableConfig = tableConfig; this.tableConfig.addPlannerConfig(queryConfigProvider); - this.defaultCatalogName = tableConfig.getBuiltInCatalogName(); - this.defaultDatabaseName = tableConfig.getBuiltInDatabaseName(); + this.defaultCatalogName = catalogManager.getCurrentCatalog(); + this.defaultDatabaseName = catalogManager.getCurrentDatabase(); this.functionCatalog = functionCatalog; this.planner = planner; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java new file mode 100644 index 0000000..7de03c8 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.delegation; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.factories.ComponentFactory; + +import java.util.Map; + +/** + * Factory that creates {@link Executor}. + * + * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is + * called with a set of normalized properties that describe the desired configuration. Those properties + * may include execution configurations such as watermark interval, max parallelism etc., table specific + * initialization configuration such as if the queries should be executed in batch mode. + * + * <p><b>Important:</b> The implementations of this interface should also implement method + * <pre>{@code public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment);} + * </pre> This method will be used when instantiating a {@link org.apache.flink.table.api.TableEnvironment} from a + * bridging module which enables conversion from/to {@code DataStream} API and requires a pre configured + * {@code StreamTableEnvironment}. + */ +@Internal +public interface ExecutorFactory extends ComponentFactory { + + /** + * Creates a corresponding {@link Executor}. + * + * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup. + * @return instance of a {@link Executor} + */ + Executor create(Map<String, String> properties); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java similarity index 55% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java index fa1128e..0df52d4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java @@ -16,54 +16,41 @@ * limitations under the License. */ -package org.apache.flink.table.api.internal; +package org.apache.flink.table.delegation; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.FunctionCatalog; -import org.apache.flink.table.delegation.Executor; -import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.ComponentFactory; -import java.lang.reflect.Constructor; +import java.util.Map; /** - * Factory to construct a {@link Planner}. It will look for the planner on the classpath. + * Factory that creates {@link Planner}. + * + * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is + * called with a set of normalized properties that describe the desired configuration. Those properties + * may include execution configurations such as watermark interval, max parallelism etc., table specific + * initialization configuration such as if the queries should be executed in batch mode. */ @Internal -public final class PlannerFactory { +public interface PlannerFactory extends ComponentFactory { /** - * Looks up {@link Planner} on the class path via reflection. + * Creates a corresponding {@link Planner}. * + * @param properties Static properties of the {@link Planner}, the same that were used for factory lookup. * @param executor The executor required by the planner. * @param tableConfig The configuration of the planner to use. * @param functionCatalog The function catalog to look up user defined functions. * @param catalogManager The catalog manager to look up tables and views. * @return instance of a {@link Planner} */ - public static Planner lookupPlanner( - Executor executor, - TableConfig tableConfig, - FunctionCatalog functionCatalog, - CatalogManager catalogManager) { - try { - Class<?> clazz = Class.forName("org.apache.flink.table.planner.StreamPlanner"); - Constructor con = clazz.getConstructor( - Executor.class, - TableConfig.class, - FunctionCatalog.class, - CatalogManager.class); - - return (Planner) con.newInstance(executor, tableConfig, functionCatalog, catalogManager); - } catch (Exception e) { - throw new TableException( - "Could not instantiate the planner. Make sure the planner module is on the classpath", - e); - } - } - - private PlannerFactory() { - } + Planner create( + Map<String, String> properties, + Executor executor, + TableConfig tableConfig, + FunctionCatalog functionCatalog, + CatalogManager catalogManager); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java new file mode 100644 index 0000000..45e87235 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.List; +import java.util.Map; + +/** + * A factory interface for components that enables further disambiguating in case + * there are multiple matching implementations present. + */ +@PublicEvolving +public interface ComponentFactory extends TableFactory { + /** + * Specifies a context of optional parameters that if exist should have the + * given values. This enables further disambiguating if there are multiple + * factories that meet the {@link #requiredContext()} and {@link #supportedProperties()}. + * + * <p><b>NOTE:</b> All the property keys should be included in {@link #supportedProperties()}. + * + * @return optional properties to disambiguate factories + */ + Map<String, String> optionalContext(); + + @Override + Map<String, String> requiredContext(); + + /** + * {@inheritDoc} + * + * <p><b>NOTE:</b> All the property keys from {@link #optionalContext()} should also be included. + */ + @Override + List<String> supportedProperties(); +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java new file mode 100644 index 0000000..db8e9ba --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.AmbiguousTableFactoryException; +import org.apache.flink.table.api.NoMatchingTableFactoryException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Unified class to search for a {@link ComponentFactory} of provided type and properties. It is similar to + * {@link TableFactoryService} but it disambiguates based on {@link ComponentFactory#optionalContext()}. + */ +@Internal +public class ComponentFactoryService { + + /** + * Finds a table factory of the given class and property map. This method enables + * disambiguating multiple matching {@link ComponentFactory}s based on additional + * optional context provided via {@link ComponentFactory#optionalContext()}. + * + * @param factoryClass desired factory class + * @param propertyMap properties that describe the factory configuration + * @param <T> factory class type + * @return the matching factory + */ + public static <T extends ComponentFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) { + List<T> all = TableFactoryService.findAll(factoryClass, propertyMap); + + List<T> filtered = all.stream().filter(factory -> { + Map<String, String> optionalContext = factory.optionalContext(); + return optionalContext.entrySet().stream().allMatch(entry -> { + String property = propertyMap.get(entry.getKey()); + if (property != null) { + return property.equals(entry.getValue()); + } else { + return true; + } + } + ); + }).collect(Collectors.toList()); + + if (filtered.size() > 1) { + throw new AmbiguousTableFactoryException( + filtered, + factoryClass, + new ArrayList<>(all), + propertyMap + ); + } else if (filtered.isEmpty()) { + throw new NoMatchingTableFactoryException( + "No factory supports the additional filters.", + factoryClass, + new ArrayList<>(all), + propertyMap); + } else { + return filtered.get(0); + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java new file mode 100644 index 0000000..13e821d --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.NoMatchingTableFactoryException; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.utils.TestPlannerFactory; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link ComponentFactoryService}. + */ +public class ComponentFactoryServiceTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testLookingUpAmbiguousPlanners() { + Map<String, String> properties = new HashMap<>(); + properties.put(EnvironmentSettings.CLASS_NAME, TestPlannerFactory.class.getCanonicalName()); + properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true)); + properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE); + + PlannerFactory plannerFactory = ComponentFactoryService.find(PlannerFactory.class, properties); + + assertThat(plannerFactory, instanceOf(TestPlannerFactory.class)); + } + + @Test + public void testLookingUpNonExistentClass() { + thrown.expect(NoMatchingTableFactoryException.class); + thrown.expectMessage("Reason: No factory supports the additional filters"); + + Map<String, String> properties = new HashMap<>(); + properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass"); + properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true)); + properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE); + + ComponentFactoryService.find(PlannerFactory.class, properties); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java new file mode 100644 index 0000000..c4c3804 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.utils; + +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.ComponentFactoryServiceTest; + +/** + * Test {@link Planner} factory used in {@link ComponentFactoryServiceTest}. + */ +public class OtherTestPlannerFactory extends TestPlannerFactory { +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java new file mode 100644 index 0000000..1a0f05f --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.utils; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Test {@link Planner} factory used in {@link org.apache.flink.table.factories.ComponentFactoryServiceTest}. + */ +public class TestPlannerFactory implements PlannerFactory { + + public static final String PLANNER_TYPE_KEY = "planner-type"; + public static final String PLANNER_TYPE_VALUE = "test-planner"; + + @Override + public Planner create( + Map<String, String> properties, Executor executor, + TableConfig tableConfig, + FunctionCatalog functionCatalog, + CatalogManager catalogManager) { + return null; + } + + @Override + public Map<String, String> optionalContext() { + HashMap<String, String> map = new HashMap<>(); + map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName()); + return map; + } + + @Override + public Map<String, String> requiredContext() { + Map<String, String> map = new HashMap<>(); + map.put(PLANNER_TYPE_KEY, PLANNER_TYPE_VALUE); + return map; + } + + @Override + public List<String> supportedProperties() { + return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.BATCH_MODE); + } +} diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory similarity index 77% copy from flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory copy to flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index ece39eb..e2a6a48 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -13,7 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.flink.table.sources.CsvBatchTableSourceFactory -org.apache.flink.table.sources.CsvAppendTableSourceFactory -org.apache.flink.table.sinks.CsvBatchTableSinkFactory -org.apache.flink.table.sinks.CsvAppendTableSinkFactory +org.apache.flink.table.factories.utils.TestPlannerFactory +org.apache.flink.table.factories.utils.OtherTestPlannerFactory diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala index 7018af2..fa6b178 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala @@ -233,11 +233,12 @@ object BatchTableEnvironment { classOf[ExecutionEnvironment], classOf[TableConfig], classOf[CatalogManager]) + val defaultCatalog = "default_catalog" val catalogManager = new CatalogManager( - tableConfig.getBuiltInCatalogName, + "default_catalog", new GenericInMemoryCatalog( - tableConfig.getBuiltInCatalogName, - tableConfig.getBuiltInDatabaseName) + defaultCatalog, + "default_database") ) const.newInstance(executionEnvironment, tableConfig, catalogManager) .asInstanceOf[BatchTableEnvironment] diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala index 473522e..3baa5fa 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala @@ -17,28 +17,33 @@ */ package org.apache.flink.table.api.scala +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl import org.apache.flink.table.api.{TableEnvironment, _} -import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog} import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor} import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction} /** - * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with - * [[DataStream]]s. + * This table environment is the entry point and central context for creating Table & SQL + * API programs that integrate with the Scala-specific [[DataStream]] API. * - * A TableEnvironment can be used to: - * - convert a [[DataStream]] to a [[Table]] - * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataStream]] - * - explain the AST and execution plan of a [[Table]] + * It is unified for bounded and unbounded data processing. + * + * A stream table environment is responsible for: + * + * - Convert a [[DataStream]] into [[Table]] and vice-versa. + * - Connecting to external systems. + * - Registering and retrieving [[Table]]s and other meta objects from a catalog. + * - Executing SQL statements. + * - Offering further configuration options. + * + * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for pure + * table programs. */ +@PublicEvolving trait StreamTableEnvironment extends TableEnvironment { /** @@ -245,43 +250,90 @@ trait StreamTableEnvironment extends TableEnvironment { object StreamTableEnvironment { /** - * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with - * [[DataStream]]s. - * - * A TableEnvironment can be used to: - * - convert a [[DataStream]] to a [[Table]] - * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataStream]] - * - explain the AST and execution plan of a [[Table]] - * - * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment. + * Creates a table environment that is the entry point and central context for creating Table & + * SQL API programs that integrate with the Scala-specific [[DataStream]] API. + * + * It is unified for bounded and unbounded data processing. + * + * A stream table environment is responsible for: + * + * - Convert a [[DataStream]] into [[Table]] and vice-versa. + * - Connecting to external systems. + * - Registering and retrieving [[Table]]s and other meta objects from a catalog. + * - Executing SQL statements. + * - Offering further configuration options. + * + * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for + * pure table programs. + * + * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the + * [[TableEnvironment]]. */ def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment = { - create(executionEnvironment, new TableConfig) + create( + executionEnvironment, + EnvironmentSettings.newInstance().build()) } /** - * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with - * [[DataStream]]s. - * - * A TableEnvironment can be used to: - * - convert a [[DataStream]] to a [[Table]] - * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataStream]] - * - explain the AST and execution plan of a [[Table]] - * - * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment. - * @param tableConfig The configuration of the TableEnvironment. + * Creates a table environment that is the entry point and central context for creating Table & + * SQL API programs that integrate with the Scala-specific [[DataStream]] API. + * + * It is unified for bounded and unbounded data processing. + * + * A stream table environment is responsible for: + * + * - Convert a [[DataStream]] into [[Table]] and vice-versa. + * - Connecting to external systems. + * - Registering and retrieving [[Table]]s and other meta objects from a catalog. + * - Executing SQL statements. + * - Offering further configuration options. + * + * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for + * pure table programs. + * + * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the + * [[TableEnvironment]]. + * @param settings The environment settings used to instantiate the [[TableEnvironment]]. + */ + def create( + executionEnvironment: StreamExecutionEnvironment, + settings: EnvironmentSettings) + : StreamTableEnvironment = { + StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig) + } + + /** + * Creates a table environment that is the entry point and central context for creating Table & + * SQL API programs that integrate with the Scala-specific [[DataStream]] API. + * + * It is unified for bounded and unbounded data processing. + * + * A stream table environment is responsible for: + * + * - Convert a [[DataStream]] into [[Table]] and vice-versa. + * - Connecting to external systems. + * - Registering and retrieving [[Table]]s and other meta objects from a catalog. + * - Executing SQL statements. + * - Offering further configuration options. + * + * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for + * pure table programs. + * + * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the + * [[TableEnvironment]]. + * @param tableConfig The configuration of the [[TableEnvironment]]. + * @deprecated Use [[create(StreamExecutionEnvironment)]] and + * [[StreamTableEnvironment#getConfig()]] for manipulating the [[TableConfig]]. */ + @deprecated def create(executionEnvironment: StreamExecutionEnvironment, tableConfig: TableConfig) : StreamTableEnvironment = { - StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment) + StreamTableEnvironmentImpl + .create( + executionEnvironment, + EnvironmentSettings.newInstance().build(), + tableConfig) } } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala index 33b304d..96b7403 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala @@ -17,9 +17,6 @@ */ package org.apache.flink.table.api.scala.internal -import java.util -import java.util.{Collections, List => JList} - import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.dag.Transformation @@ -29,18 +26,22 @@ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.table.api._ -import org.apache.flink.table.api.internal.{PlannerFactory, TableEnvironmentImpl} +import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} -import org.apache.flink.table.delegation.{Executor, Planner} +import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner, PlannerFactory} import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor} import org.apache.flink.table.expressions.Expression +import org.apache.flink.table.factories.ComponentFactoryService import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper} import org.apache.flink.table.operations.{OutputConversionModifyOperation, ScalaDataStreamQueryOperation} import org.apache.flink.table.sources.{TableSource, TableSourceValidation} import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.typeutils.FieldInfoUtils +import java.util +import java.util.{Collections, List => JList, Map => JMap} + import _root_.scala.collection.JavaConverters._ /** @@ -234,61 +235,54 @@ class StreamTableEnvironmentImpl ( object StreamTableEnvironmentImpl { - /** - * Creates an instance of a [[StreamTableEnvironment]]. It uses the - * [[StreamExecutionEnvironment]] for executing queries. This is also the - * [[StreamExecutionEnvironment]] that will be used when converting - * from/to [[DataStream]]. - * - * @param tableConfig The configuration of the TableEnvironment. - * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment. - */ def create( - tableConfig: TableConfig, - executionEnvironment: StreamExecutionEnvironment) + executionEnvironment: StreamExecutionEnvironment, + settings: EnvironmentSettings, + tableConfig: TableConfig) : StreamTableEnvironmentImpl = { - val catalogManager = new CatalogManager( - tableConfig.getBuiltInCatalogName, - new GenericInMemoryCatalog( - tableConfig.getBuiltInCatalogName, - tableConfig.getBuiltInDatabaseName) - ) - create(catalogManager, tableConfig, executionEnvironment) - } - - /** - * Creates an instance of a [[StreamTableEnvironment]]. It uses the - * [[StreamExecutionEnvironment]] for executing queries. This is also the - * [[StreamExecutionEnvironment]] that will be used when converting - * from/to [[DataStream]]. - * - * @param catalogManager The [[CatalogManager]] to use for storing and looking up [[Table]]s. - * @param tableConfig The configuration of the TableEnvironment. - * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment. - */ - def create( - catalogManager: CatalogManager, - tableConfig: TableConfig, - executionEnvironment: StreamExecutionEnvironment) - : StreamTableEnvironmentImpl = { - val executor = lookupExecutor(executionEnvironment) + val executorProperties = settings.toExecutorProperties + val plannerProperties = settings.toPlannerProperties + val executor = lookupExecutor(executorProperties, executionEnvironment) val functionCatalog = new FunctionCatalog( - catalogManager.getCurrentCatalog, - catalogManager.getCurrentDatabase) + settings.getBuiltInCatalogName, + settings.getBuiltInDatabaseName) + val catalogManager = new CatalogManager( + settings.getBuiltInCatalogName, + new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) + val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties) + .create( + plannerProperties, + executor, + tableConfig, + functionCatalog, + catalogManager) new StreamTableEnvironmentImpl( catalogManager, functionCatalog, tableConfig, executionEnvironment, - PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager), + planner, executor) } - private def lookupExecutor(executionEnvironment: StreamExecutionEnvironment) = + private def lookupExecutor( + executorProperties: JMap[String, String], + executionEnvironment: StreamExecutionEnvironment) + :Executor = try { - val clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory") - val createMethod = clazz.getMethod("create", classOf[JStreamExecutionEnvironment]) - createMethod.invoke(null, executionEnvironment.getWrappedStreamExecutionEnvironment) + val executorFactory = ComponentFactoryService + .find(classOf[ExecutorFactory], executorProperties) + val createMethod = executorFactory.getClass + .getMethod( + "create", + classOf[util.Map[String, String]], + classOf[JStreamExecutionEnvironment]) + + createMethod + .invoke( + executorFactory, + executorProperties, + executionEnvironment.getWrappedStreamExecutionEnvironment) .asInstanceOf[Executor] } catch { case e: Exception => diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java index 5148839..426e09e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java @@ -19,6 +19,7 @@ package org.apache.flink.table.executor; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -34,7 +35,8 @@ import java.util.List; public class StreamExecutor implements Executor { private final StreamExecutionEnvironment executionEnvironment; - StreamExecutor(StreamExecutionEnvironment executionEnvironment) { + @VisibleForTesting + public StreamExecutor(StreamExecutionEnvironment executionEnvironment) { this.executionEnvironment = executionEnvironment; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java similarity index 51% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java index 5534217..dff5590 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java @@ -20,36 +20,59 @@ package org.apache.flink.table.executor; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.descriptors.DescriptorProperties; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Factory to create an implementation of {@link Executor} to use in a * {@link org.apache.flink.table.api.TableEnvironment}. The {@link org.apache.flink.table.api.TableEnvironment} - * should use {@link #create()} method that does not bind to any particular environment, + * should use {@link #create(Map)} method that does not bind to any particular environment, * whereas {@link org.apache.flink.table.api.scala.StreamTableEnvironment} should use - * {@link #create(StreamExecutionEnvironment)} as it is always backed by some {@link StreamExecutionEnvironment} + * {@link #create(Map, StreamExecutionEnvironment)} as it is always backed by + * some {@link StreamExecutionEnvironment} */ @Internal -public class ExecutorFactory { +public class StreamExecutorFactory implements ExecutorFactory { + /** - * Creates a {@link StreamExecutor} that is backed by given {@link StreamExecutionEnvironment}. + * Creates a corresponding {@link StreamExecutor}. * + * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup. * @param executionEnvironment a {@link StreamExecutionEnvironment} to use while executing Table programs. - * @return {@link StreamExecutor} + * @return instance of a {@link Executor} */ - public static Executor create(StreamExecutionEnvironment executionEnvironment) { + public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) { return new StreamExecutor(executionEnvironment); } - /** - * Creates a {@link StreamExecutor} that is backed by a default {@link StreamExecutionEnvironment}. - * - * @return {@link StreamExecutor} - */ - public static Executor create() { + @Override + public Executor create(Map<String, String> properties) { return new StreamExecutor(StreamExecutionEnvironment.getExecutionEnvironment()); } - private ExecutorFactory() { + @Override + public Map<String, String> requiredContext() { + DescriptorProperties properties = new DescriptorProperties(); + properties.putBoolean(EnvironmentSettings.BATCH_MODE, false); + return properties.asMap(); + } + + @Override + public List<String> supportedProperties() { + return Collections.singletonList(EnvironmentSettings.CLASS_NAME); + } + + @Override + public Map<String, String> optionalContext() { + Map<String, String> context = new HashMap<>(); + context.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName()); + return context; } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java new file mode 100644 index 0000000..4efb850 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.descriptors.DescriptorProperties; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Factory to construct a {@link StreamPlanner}. + */ +@Internal +public final class StreamPlannerFactory implements PlannerFactory { + + @Override + public Planner create( + Map<String, String> properties, + Executor executor, + TableConfig tableConfig, + FunctionCatalog functionCatalog, + CatalogManager catalogManager) { + return new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager); + } + + public Map<String, String> optionalContext() { + Map<String, String> map = new HashMap<>(); + map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName()); + return map; + } + + @Override + public Map<String, String> requiredContext() { + DescriptorProperties properties = new DescriptorProperties(); + + properties.putBoolean(EnvironmentSettings.BATCH_MODE, false); + return properties.asMap(); + } + + @Override + public List<String> supportedProperties() { + return Collections.singletonList(EnvironmentSettings.CLASS_NAME); + } +} diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index ece39eb..6bd2ea3 100644 --- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -17,3 +17,5 @@ org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.table.sinks.CsvBatchTableSinkFactory org.apache.flink.table.sinks.CsvAppendTableSinkFactory +org.apache.flink.table.planner.StreamPlannerFactory +org.apache.flink.table.executor.StreamExecutorFactory diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 2b51869..efa29ed 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -52,8 +52,8 @@ abstract class TableEnvImpl( private val catalogManager: CatalogManager) extends TableEnvironment { - protected val defaultCatalogName: String = config.getBuiltInCatalogName - protected val defaultDatabaseName: String = config.getBuiltInDatabaseName + protected val defaultCatalogName: String = catalogManager.getCurrentCatalog + protected val defaultDatabaseName: String = catalogManager.getCurrentDatabase // Table API/SQL function catalog private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala index bdb2913..44dc4ef 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.api.stream -import java.lang.{Integer => JInt, Long => JLong} - import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.scala._ @@ -31,14 +29,19 @@ import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => J import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv} import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{TableConfig, Types, ValidationException} -import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} +import org.apache.flink.table.executor.StreamExecutor +import org.apache.flink.table.planner.StreamPlanner import org.apache.flink.table.runtime.utils.StreamTestData import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode} import org.apache.flink.types.Row + import org.junit.Test import org.mockito.Mockito.{mock, when} +import java.lang.{Integer => JInt, Long => JLong} + class StreamTableEnvironmentTest extends TableTestBase { @Test @@ -200,12 +203,19 @@ class StreamTableEnvironmentTest extends TableTestBase { val jStreamExecEnv = mock(classOf[JStreamExecEnv]) when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime) val config = new TableConfig - val jTEnv = JStreamTableEnvironmentImpl.create( - new CatalogManager( - config.getBuiltInCatalogName, - new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName)), + val manager: CatalogManager = new CatalogManager( + "default_catalog", + new GenericInMemoryCatalog("default_catalog", "default_database")) + val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv) + val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase) + val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager) + val jTEnv = new JStreamTableEnvironmentImpl( + manager, + functionCatalog, config, - jStreamExecEnv) + jStreamExecEnv, + streamPlanner, + executor) val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG) .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]] diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 2ca6ee3..e72225d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.utils -import org.apache.calcite.plan.RelOptUtil -import org.apache.calcite.rel.RelNode import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet} import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} @@ -32,12 +30,16 @@ import org.apache.flink.table.api.java.internal.{BatchTableEnvironmentImpl => Ja import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl => ScalaBatchTableEnvironmentImpl, StreamTableEnvironmentImpl => ScalaStreamTableEnvironmentImpl} import org.apache.flink.table.api.{Table, TableConfig, TableSchema} -import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog} +import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog} +import org.apache.flink.table.executor.StreamExecutor import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation} import org.apache.flink.table.planner.StreamPlanner import org.apache.flink.table.utils.TableTestUtil.createCatalogManager + +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode import org.junit.Assert.assertEquals import org.junit.rules.ExpectedException import org.junit.{ComparisonFailure, Rule} @@ -137,25 +139,11 @@ object TableTestUtil { val ANY_SUBTREE = "%ANY_SUBTREE%" - /** - * Creates a [[CatalogManager]] with a builtin default catalog & database set to values - * specified in the [[TableConfig]]. - */ - def createCatalogManager(config: TableConfig): CatalogManager = { + def createCatalogManager(): CatalogManager = { + val defaultCatalog = "default_catalog" new CatalogManager( - config.getBuiltInCatalogName, - new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName)) - } - - /** - * Sets the configuration of the builtin catalog & databases in [[TableConfig]] - * to the current catalog & database of the given [[CatalogManager]]. This should be used - * to ensure sanity of a [[org.apache.flink.table.api.TableEnvironment]]. - */ - def extractBuiltinPath(config: TableConfig, catalogManager: CatalogManager): TableConfig = { - config.setBuiltInCatalogName(catalogManager.getCurrentCatalog) - config.setBuiltInDatabaseName(catalogManager.getCurrentDatabase) - config + defaultCatalog, + new GenericInMemoryCatalog(defaultCatalog, "default_database")) } private[utils] def toRelNode(expected: Table) = { @@ -239,22 +227,15 @@ case class BatchTableTestUtil( extends TableTestUtil { val javaEnv = new LocalEnvironment() - private def tableConfig = catalogManager match { - case Some(c) => - TableTestUtil.extractBuiltinPath(new TableConfig, c) - case None => - new TableConfig - } - val javaTableEnv = new JavaBatchTableEnvironmentImpl( javaEnv, - tableConfig, - catalogManager.getOrElse(createCatalogManager(new TableConfig))) + new TableConfig, + catalogManager.getOrElse(createCatalogManager())) val env = new ExecutionEnvironment(javaEnv) val tableEnv = new ScalaBatchTableEnvironmentImpl( env, - tableConfig, - catalogManager.getOrElse(createCatalogManager(new TableConfig))) + new TableConfig, + catalogManager.getOrElse(createCatalogManager())) def addTable[T: TypeInformation]( name: String, @@ -344,22 +325,29 @@ case class StreamTableTestUtil( val javaEnv = new LocalStreamEnvironment() javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - private def tableConfig = catalogManager match { - case Some(c) => - TableTestUtil.extractBuiltinPath(new TableConfig, c) - case None => - new TableConfig - } + private val tableConfig = new TableConfig + private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager()) + private val executor: StreamExecutor = new StreamExecutor(javaEnv) + private val functionCatalog = + new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase) + private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager) - val javaTableEnv = JavaStreamTableEnvironmentImpl.create( - catalogManager.getOrElse(createCatalogManager(new TableConfig)), + val javaTableEnv = new JavaStreamTableEnvironmentImpl( + manager, + functionCatalog, tableConfig, - javaEnv) + javaEnv, + streamPlanner, + executor) + val env = new StreamExecutionEnvironment(javaEnv) - val tableEnv = ScalaStreamTableEnvironmentImpl.create( - catalogManager.getOrElse(createCatalogManager(new TableConfig)), + val tableEnv = new ScalaStreamTableEnvironmentImpl( + manager, + functionCatalog, tableConfig, - env) + env, + streamPlanner, + executor) def addTable[T: TypeInformation]( name: String,