This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5feff4a46f7c4c9d395347e42b9d68b8fe00c8a5
Author: Marios Trivyzas <mat...@gmail.com>
AuthorDate: Tue Mar 1 14:57:13 2022 +0200

    [FLINK-26421][table] Remove planner & executor string identifiers
    
    Remove planner&executor string identifiers from `EnvironmentSettings`
    and use the default strings which are only used anyway.
---
 .../flink/connectors/hive/HiveDialectITCase.java   |  8 +++--
 .../table/catalog/hive/HiveCatalogITCase.java      | 13 +++++---
 .../client/gateway/context/ExecutionContext.java   | 14 +++-----
 .../AbstractStreamTableEnvironmentImpl.java        |  6 ++--
 .../java/internal/StreamTableEnvironmentImpl.java  | 10 ++----
 .../flink/table/api/EnvironmentSettings.java       | 37 ++--------------------
 .../table/api/internal/TableEnvironmentImpl.java   |  9 ++----
 .../flink/table/factories/PlannerFactoryUtil.java  |  3 +-
 .../internal/StreamTableEnvironmentImpl.scala      |  6 ++--
 .../flink/table/planner/utils/TableTestBase.scala  | 11 ++++---
 10 files changed, 37 insertions(+), 80 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 30537c2..24c6904 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -75,8 +76,6 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
 
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -90,6 +89,11 @@ import static org.junit.Assert.fail;
 /** Test Hive syntax when Hive dialect is used. */
 public class HiveDialectITCase {
 
+    private static final String DEFAULT_BUILTIN_CATALOG =
+            TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
+    private static final String DEFAULT_BUILTIN_DATABASE =
+            TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();
+
     private TableEnvironment tableEnv;
     private HiveCatalog hiveCatalog;
     private String warehouse;
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index fe06b2b..ad84ede 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -86,6 +87,11 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class HiveCatalogITCase {
 
+    private static final String DEFAULT_BUILTIN_CATALOG =
+            TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
+    private static final String DEFAULT_BUILTIN_DATABASE =
+            TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();
+
     @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
     private static HiveCatalog hiveCatalog;
@@ -472,15 +478,14 @@ public class HiveCatalogITCase {
         tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
         tableEnv.useCatalog(hiveCatalog.getName());
         tableEnv.executeSql("create table generic_table (x int) with 
('connector'='COLLECTION')");
-        tableEnv.useCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG);
+        tableEnv.useCatalog(DEFAULT_BUILTIN_CATALOG);
         tableEnv.executeSql(
                 String.format(
                         "create table copy like `%s`.`default`.generic_table",
                         hiveCatalog.getName()));
-        Catalog builtInCat = 
tableEnv.getCatalog(EnvironmentSettings.DEFAULT_BUILTIN_CATALOG).get();
+        Catalog builtInCat = 
tableEnv.getCatalog(DEFAULT_BUILTIN_CATALOG).get();
         CatalogBaseTable catalogTable =
-                builtInCat.getTable(
-                        new 
ObjectPath(EnvironmentSettings.DEFAULT_BUILTIN_DATABASE, "copy"));
+                builtInCat.getTable(new ObjectPath(DEFAULT_BUILTIN_DATABASE, 
"copy"));
         assertThat(catalogTable.getOptions()).hasSize(1);
         assertThat(catalogTable.getOptions())
                 .containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION");
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 135cf8f..ae1a829 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -106,7 +106,7 @@ public class ExecutionContext {
 
         StreamExecutionEnvironment streamExecEnv = 
createStreamExecutionEnvironment();
 
-        final Executor executor = lookupExecutor(settings.getExecutor(), 
streamExecEnv);
+        final Executor executor = lookupExecutor(streamExecEnv);
         return createStreamTableEnvironment(
                 streamExecEnv,
                 settings,
@@ -130,12 +130,7 @@ public class ExecutionContext {
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
-                        settings.getPlanner(),
-                        executor,
-                        tableConfig,
-                        moduleManager,
-                        catalogManager,
-                        functionCatalog);
+                        executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
 
         return new StreamTableEnvironmentImpl(
                 catalogManager,
@@ -149,12 +144,11 @@ public class ExecutionContext {
                 userClassLoader);
     }
 
-    private Executor lookupExecutor(
-            String executorIdentifier, StreamExecutionEnvironment 
executionEnvironment) {
+    private Executor lookupExecutor(StreamExecutionEnvironment 
executionEnvironment) {
         try {
             final ExecutorFactory executorFactory =
                     FactoryUtil.discoverFactory(
-                            classLoader, ExecutorFactory.class, 
executorIdentifier);
+                            classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
             final Method createMethod =
                     executorFactory
                             .getClass()
diff --git 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
index c9fb822..c8df950 100644
--- 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
@@ -98,14 +98,12 @@ public abstract class AbstractStreamTableEnvironmentImpl 
extends TableEnvironmen
     }
 
     public static Executor lookupExecutor(
-            ClassLoader classLoader,
-            String executorIdentifier,
-            StreamExecutionEnvironment executionEnvironment) {
+            ClassLoader classLoader, StreamExecutionEnvironment 
executionEnvironment) {
         final ExecutorFactory executorFactory;
         try {
             executorFactory =
                     FactoryUtil.discoverFactory(
-                            classLoader, ExecutorFactory.class, 
executorIdentifier);
+                            classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
         } catch (Exception e) {
             throw new TableException(
                     "Could not instantiate the executor. Make sure a planner 
module is on the classpath",
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index c6c7d43..5d66cfa 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -116,17 +116,11 @@ public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnviron
         final FunctionCatalog functionCatalog =
                 new FunctionCatalog(tableConfig, catalogManager, 
moduleManager);
 
-        final Executor executor =
-                lookupExecutor(classLoader, settings.getExecutor(), 
executionEnvironment);
+        final Executor executor = lookupExecutor(classLoader, 
executionEnvironment);
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
-                        settings.getPlanner(),
-                        executor,
-                        tableConfig,
-                        moduleManager,
-                        catalogManager,
-                        functionCatalog);
+                        executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
 
         return new StreamTableEnvironmentImpl(
                 catalogManager,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index cc9a32b..d8a2464 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -18,18 +18,11 @@
 
 package org.apache.flink.table.api;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.functions.UserDefinedFunction;
 
-import javax.annotation.Nullable;
-
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
@@ -63,12 +56,6 @@ public class EnvironmentSettings {
     public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
     public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
 
-    /** Factory identifier of the {@link Planner} to use. */
-    private final String planner;
-
-    /** Factory identifier of the {@link Executor} to use. */
-    private final String executor;
-
     /**
      * Specifies the name of the initial catalog to be created when 
instantiating {@link
      * TableEnvironment}.
@@ -88,13 +75,7 @@ public class EnvironmentSettings {
     private final boolean isStreamingMode;
 
     private EnvironmentSettings(
-            String planner,
-            @Nullable String executor,
-            String builtInCatalogName,
-            String builtInDatabaseName,
-            boolean isStreamingMode) {
-        this.planner = planner;
-        this.executor = executor;
+            String builtInCatalogName, String builtInDatabaseName, boolean 
isStreamingMode) {
         this.builtInCatalogName = builtInCatalogName;
         this.builtInDatabaseName = builtInDatabaseName;
         this.isStreamingMode = isStreamingMode;
@@ -180,23 +161,9 @@ public class EnvironmentSettings {
         return isStreamingMode;
     }
 
-    /** Returns the identifier of the {@link Planner} to be used. */
-    @Internal
-    public String getPlanner() {
-        return planner;
-    }
-
-    /** Returns the {@link Executor} that should submit and execute table 
programs. */
-    @Internal
-    public String getExecutor() {
-        return executor;
-    }
-
     /** A builder for {@link EnvironmentSettings}. */
     @PublicEvolving
     public static class Builder {
-        private final String planner = PlannerFactory.DEFAULT_IDENTIFIER;
-        private final String executor = ExecutorFactory.DEFAULT_IDENTIFIER;
 
         private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
         private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
@@ -255,7 +222,7 @@ public class EnvironmentSettings {
         /** Returns an immutable instance of {@link EnvironmentSettings}. */
         public EnvironmentSettings build() {
             return new EnvironmentSettings(
-                    planner, executor, builtInCatalogName, 
builtInDatabaseName, isStreamingMode);
+                    builtInCatalogName, builtInDatabaseName, isStreamingMode);
         }
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index ea17a9d..8d6a5a9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -299,17 +299,12 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 
         final ExecutorFactory executorFactory =
                 FactoryUtil.discoverFactory(
-                        classLoader, ExecutorFactory.class, 
settings.getExecutor());
+                        classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
         final Executor executor = executorFactory.create(configuration);
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
-                        settings.getPlanner(),
-                        executor,
-                        tableConfig,
-                        moduleManager,
-                        catalogManager,
-                        functionCatalog);
+                        executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
 
         return new TableEnvironmentImpl(
                 catalogManager,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java
index ef64f32..674c916 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/PlannerFactoryUtil.java
@@ -35,7 +35,6 @@ public class PlannerFactoryUtil {
 
     /** Discovers a planner factory and creates a planner instance. */
     public static Planner createPlanner(
-            String plannerIdentifier,
             Executor executor,
             TableConfig tableConfig,
             ModuleManager moduleManager,
@@ -45,7 +44,7 @@ public class PlannerFactoryUtil {
                 FactoryUtil.discoverFactory(
                         Thread.currentThread().getContextClassLoader(),
                         PlannerFactory.class,
-                        plannerIdentifier);
+                        PlannerFactory.DEFAULT_IDENTIFIER);
 
         final Context context =
                 new DefaultPlannerContext(
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index f56ba48..a968e49 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -317,10 +317,10 @@ object StreamTableEnvironmentImpl {
     val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, 
moduleManager)
 
     val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
-      classLoader, settings.getExecutor, 
executionEnvironment.getWrappedStreamExecutionEnvironment)
+      classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment)
 
-    val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, 
executor, tableConfig,
-      moduleManager, catalogManager, functionCatalog)
+    val planner = PlannerFactoryUtil.createPlanner(
+      executor, tableConfig, moduleManager, catalogManager, functionCatalog)
 
     new StreamTableEnvironmentImpl(
       catalogManager,
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 6cb12cc..0d088d6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -26,13 +26,13 @@ import org.apache.flink.api.common.typeinfo.{AtomicType, 
TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.ExecutionOptions
-
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, 
StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
 import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
 import org.apache.flink.table.api._
+import 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
 import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => 
JavaStreamTableEnv}
 import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => 
ScalaStreamTableEnv}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
@@ -1549,12 +1549,13 @@ object TestingTableEnvironment {
 
     val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, 
moduleManager)
 
-    val executorFactory =
-      FactoryUtil.discoverFactory(classLoader, classOf[ExecutorFactory], 
settings.getExecutor)
+    val executorFactory = FactoryUtil.discoverFactory(
+      classLoader, classOf[ExecutorFactory], 
ExecutorFactory.DEFAULT_IDENTIFIER)
+
     val executor = executorFactory.create(tableConfig.getConfiguration)
 
-    val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, 
executor, tableConfig,
-      moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase]
+    val planner = PlannerFactoryUtil.createPlanner(
+      executor, tableConfig, moduleManager, catalogMgr, 
functionCatalog).asInstanceOf[PlannerBase]
 
     new TestingTableEnvironment(
       catalogMgr,

Reply via email to