twalthr commented on a change in pull request #18980: URL: https://github.com/apache/flink/pull/18980#discussion_r825780154
########## File path: flink-python/pyflink/java_gateway.py ########## @@ -132,6 +132,7 @@ def import_flink_view(gateway): """ # Import the classes used by PyFlink java_import(gateway.jvm, "org.apache.flink.table.api.*") + java_import(gateway.jvm, "org.apache.flink.table.api.config.TableConfigOptions") Review comment: nit: we can also have a star import for the entire package. ########## File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java ########## @@ -99,18 +99,20 @@ public StreamTableEnvironment getTableEnvironment() { private StreamTableEnvironment createTableEnvironment() { // checks the value of RUNTIME_MODE Review comment: this comment still valid? ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java ########## @@ -60,44 +57,21 @@ private static final EnvironmentSettings DEFAULT_BATCH_MODE_SETTINGS = EnvironmentSettings.newInstance().inBatchMode().build(); - public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog"; - public static final String DEFAULT_BUILTIN_DATABASE = "default_database"; - - /** Factory identifier of the {@link Planner} to use. */ - private final String planner; - - /** Factory identifier of the {@link Executor} to use. */ - private final String executor; - - /** - * Specifies the name of the initial catalog to be created when instantiating {@link - * TableEnvironment}. - */ - private final String builtInCatalogName; - /** - * Specifies the name of the default database in the initial catalog to be created when - * instantiating {@link TableEnvironment}. + * Holds all the configuration generated by the builder, together with any required additional + * configuration. */ - private final String builtInDatabaseName; - - /** - * Determines if the table environment should work in a batch ({@code false}) or streaming - * ({@code true}) mode. - */ - private final boolean isStreamingMode; - - private EnvironmentSettings( - String planner, - @Nullable String executor, - String builtInCatalogName, - String builtInDatabaseName, - boolean isStreamingMode) { - this.planner = planner; - this.executor = executor; - this.builtInCatalogName = builtInCatalogName; - this.builtInDatabaseName = builtInDatabaseName; - this.isStreamingMode = isStreamingMode; + private final Configuration configuration; + + private EnvironmentSettings(Configuration configuration) { + this.configuration = configuration; + if (configuration.get(RUNTIME_MODE) == AUTOMATIC) { Review comment: I'm wondering if this location is too eager to check. Can you check one more time what happens if the runtime mode is not set in environment settings and is AUTOMATIC in `StreamExecutionEnviornment`. What is the behavior of `StreamTableEnviornment.create(env)`? ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala ########## @@ -1526,19 +1526,26 @@ object TestingTableEnvironment { catalogManager: Option[CatalogManager] = None, tableConfig: TableConfig): TestingTableEnvironment = { - tableConfig.addConfiguration(settings.toConfiguration) - // temporary solution until FLINK-15635 is fixed val classLoader = Thread.currentThread.getContextClassLoader + val executorFactory = FactoryUtil.discoverFactory( + classLoader, classOf[ExecutorFactory], ExecutorFactory.DEFAULT_IDENTIFIER) + + val executor = executorFactory.create(settings.getConfiguration) + + tableConfig.setRootConfiguration(executor.getConfiguration) + tableConfig.addConfiguration(settings.getConfiguration) + + val moduleManager = new ModuleManager val catalogMgr = catalogManager match { case Some(c) => c case _ => CatalogManager.newBuilder .classLoader(classLoader) - .config(tableConfig.getConfiguration) + .config(settings.getConfiguration) Review comment: use `tableConfig` instead and remove `getConfiguration`. in the end the code should be close to table env. ########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala ########## @@ -120,23 +118,6 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = { elements.filter(e => !e.isInstanceOf[Watermark]).toList } - - class TestTableConfig extends TableConfig { Review comment: can we make `TableConfig` class final now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org