twalthr commented on a change in pull request #18980:
URL: https://github.com/apache/flink/pull/18980#discussion_r825780154



##########
File path: flink-python/pyflink/java_gateway.py
##########
@@ -132,6 +132,7 @@ def import_flink_view(gateway):
     """
     # Import the classes used by PyFlink
     java_import(gateway.jvm, "org.apache.flink.table.api.*")
+    java_import(gateway.jvm, 
"org.apache.flink.table.api.config.TableConfigOptions")

Review comment:
       nit: we can also have a star import for the entire package.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
##########
@@ -99,18 +99,20 @@ public StreamTableEnvironment getTableEnvironment() {
 
     private StreamTableEnvironment createTableEnvironment() {
         // checks the value of RUNTIME_MODE

Review comment:
       this comment still valid?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
##########
@@ -60,44 +57,21 @@
     private static final EnvironmentSettings DEFAULT_BATCH_MODE_SETTINGS =
             EnvironmentSettings.newInstance().inBatchMode().build();
 
-    public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
-    public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
-
-    /** Factory identifier of the {@link Planner} to use. */
-    private final String planner;
-
-    /** Factory identifier of the {@link Executor} to use. */
-    private final String executor;
-
-    /**
-     * Specifies the name of the initial catalog to be created when 
instantiating {@link
-     * TableEnvironment}.
-     */
-    private final String builtInCatalogName;
-
     /**
-     * Specifies the name of the default database in the initial catalog to be 
created when
-     * instantiating {@link TableEnvironment}.
+     * Holds all the configuration generated by the builder, together with any 
required additional
+     * configuration.
      */
-    private final String builtInDatabaseName;
-
-    /**
-     * Determines if the table environment should work in a batch ({@code 
false}) or streaming
-     * ({@code true}) mode.
-     */
-    private final boolean isStreamingMode;
-
-    private EnvironmentSettings(
-            String planner,
-            @Nullable String executor,
-            String builtInCatalogName,
-            String builtInDatabaseName,
-            boolean isStreamingMode) {
-        this.planner = planner;
-        this.executor = executor;
-        this.builtInCatalogName = builtInCatalogName;
-        this.builtInDatabaseName = builtInDatabaseName;
-        this.isStreamingMode = isStreamingMode;
+    private final Configuration configuration;
+
+    private EnvironmentSettings(Configuration configuration) {
+        this.configuration = configuration;
+        if (configuration.get(RUNTIME_MODE) == AUTOMATIC) {

Review comment:
       I'm wondering if this location is too eager to check. Can you check one 
more time what happens if the runtime mode is not set in environment settings 
and is AUTOMATIC in `StreamExecutionEnviornment`. What is the behavior of 
`StreamTableEnviornment.create(env)`?

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
##########
@@ -1526,19 +1526,26 @@ object TestingTableEnvironment {
       catalogManager: Option[CatalogManager] = None,
       tableConfig: TableConfig): TestingTableEnvironment = {
 
-    tableConfig.addConfiguration(settings.toConfiguration)
-
     // temporary solution until FLINK-15635 is fixed
     val classLoader = Thread.currentThread.getContextClassLoader
 
+    val executorFactory = FactoryUtil.discoverFactory(
+      classLoader, classOf[ExecutorFactory], 
ExecutorFactory.DEFAULT_IDENTIFIER)
+
+    val executor = executorFactory.create(settings.getConfiguration)
+
+    tableConfig.setRootConfiguration(executor.getConfiguration)
+    tableConfig.addConfiguration(settings.getConfiguration)
+
+
     val moduleManager = new ModuleManager
 
     val catalogMgr = catalogManager match {
       case Some(c) => c
       case _ =>
         CatalogManager.newBuilder
           .classLoader(classLoader)
-          .config(tableConfig.getConfiguration)
+          .config(settings.getConfiguration)

Review comment:
       use `tableConfig` instead and remove `getConfiguration`. in the end the 
code should be close to table env.

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
##########
@@ -120,23 +118,6 @@ class HarnessTestBase(mode: StateBackendMode) extends 
StreamingTestBase {
   def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = {
     elements.filter(e => !e.isInstanceOf[Watermark]).toList
   }
-
-  class TestTableConfig extends TableConfig {

Review comment:
       can we make `TableConfig` class final now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to