This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95944e231315f085d5e23717332aa2866caa5d8a
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Wed Jun 19 15:00:23 2019 +0200

    [FLINK-12798][table] Add a discovery mechanism for switching between 
Flink/Blink Planner/Executor
    
    This closes #8852.
---
 flink-python/pyflink/table/table_config.py         |  36 ----
 .../table/tests/test_table_environment_api.py      |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.java     | 155 +++++++++-----
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java       | 228 +++++++++++++++++++++
 .../org/apache/flink/table/api/TableConfig.java    |  44 ----
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java    |  50 +++++
 .../internal => delegation}/PlannerFactory.java    |  49 ++---
 .../flink/table/factories/ComponentFactory.java    |  53 +++++
 .../table/factories/ComponentFactoryService.java   |  80 ++++++++
 .../factories/ComponentFactoryServiceTest.java     |  68 ++++++
 .../factories/utils/OtherTestPlannerFactory.java   |  28 +++
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   6 +-
 .../table/api/scala/BatchTableEnvironment.scala    |   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 ++++++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  90 ++++----
 .../flink/table/executor/StreamExecutor.java       |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala    |   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |  26 ++-
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++----
 29 files changed, 1052 insertions(+), 393 deletions(-)

diff --git a/flink-python/pyflink/table/table_config.py 
b/flink-python/pyflink/table/table_config.py
index 7eb3513..d6b5864 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -90,39 +90,3 @@ class TableConfig(object):
             
self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
         else:
             raise Exception("TableConfig.max_generated_code_length should be a 
int value!")
-
-    def get_built_in_catalog_name(self):
-        """
-        Gets the specified name of the initial catalog to be created when 
instantiating
-        :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInCatalogName()
-
-    def set_built_in_catalog_name(self, built_in_catalog_name):
-        """
-        Specifies the name of the initial catalog to be created when 
instantiating
-        :class:`TableEnvironment`. This method has no effect if called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_catalog_name is not None and 
isinstance(built_in_catalog_name, str):
-            self._j_table_config.setBuiltInCatalogName(built_in_catalog_name)
-        else:
-            raise Exception("TableConfig.built_in_catalog_name should be a 
string value!")
-
-    def get_built_in_database_name(self):
-        """
-        Gets the specified name of the default database in the initial catalog 
to be created when
-        instantiating :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInDatabaseName()
-
-    def set_built_in_database_name(self, built_in_database_name):
-        """
-        Specifies the name of the default database in the initial catalog to 
be created when
-        instantiating :class:`TableEnvironment`. This method has no effect if 
called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_database_name is not None and 
isinstance(built_in_database_name, str):
-            self._j_table_config.setBuiltInDatabaseName(built_in_database_name)
-        else:
-            raise Exception("TableConfig.built_in_database_name should be a 
string value!")
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py 
b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a129d22..4eba1bb 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -170,8 +170,6 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = StreamExecutionEnvironment.get_execution_environment()
         t_env = StreamTableEnvironment.create(env, table_config)
@@ -181,8 +179,6 @@ class 
StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 
32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), 
"test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), 
"test_database")
 
 
 class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
@@ -208,22 +204,16 @@ class 
BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         table_config.set_timezone("Asia/Shanghai")
         table_config.set_max_generated_code_length(64000)
         table_config.set_null_check(True)
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         self.assertTrue(table_config.get_null_check())
         self.assertEqual(table_config.get_max_generated_code_length(), 64000)
         self.assertEqual(table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), 
"test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), 
"test_database")
 
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = ExecutionEnvironment.get_execution_environment()
         t_env = BatchTableEnvironment.create(env, table_config)
@@ -233,5 +223,3 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 
32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(readed_table_config.get_built_in_catalog_name(), 
"test_catalog")
-        self.assertEqual(readed_table_config.get_built_in_database_name(), 
"test_database")
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index ac60fcc..fb0c80c 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -110,7 +109,7 @@ public class ExecutionContextTest {
                assertEquals(
                        new HashSet<>(
                                Arrays.asList(
-                                       
TableConfig.getDefault().getBuiltInCatalogName(),
+                                       "default_catalog",
                                        inmemoryCatalog,
                                        hiveCatalog,
                                        hiveDefaultVersionCatalog,
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 4f92ae7..023d656 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
@@ -76,7 +75,6 @@ import static org.junit.Assert.fail;
 public class LocalExecutorITCase extends TestLogger {
 
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
-
        private static final int NUM_TMS = 2;
        private static final int NUM_SLOTS_PER_TM = 2;
 
@@ -158,7 +156,7 @@ public class LocalExecutorITCase extends TestLogger {
                final List<String> actualCatalogs = 
executor.listCatalogs(session);
 
                final List<String> expectedCatalogs = Arrays.asList(
-                       TableConfig.getDefault().getBuiltInCatalogName(),
+                       "default_catalog",
                        "catalog1");
                assertEquals(expectedCatalogs, actualCatalogs);
        }
@@ -170,8 +168,7 @@ public class LocalExecutorITCase extends TestLogger {
 
                final List<String> actualDatabases = 
executor.listDatabases(session);
 
-               final List<String> expectedDatabases = Arrays.asList(
-                       TableConfig.getDefault().getBuiltInDatabaseName());
+               final List<String> expectedDatabases = 
Arrays.asList("default_database");
                assertEquals(expectedDatabases, actualDatabases);
        }
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 1759b60..16f63af 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -287,9 +287,10 @@ public interface BatchTableEnvironment extends 
TableEnvironment {
                try {
                        Class<?> clazz = 
Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
                        Constructor con = 
clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, 
CatalogManager.class);
+                       String defaultCatalog = "default_catalog";
                        CatalogManager catalogManager = new CatalogManager(
-                               tableConfig.getBuiltInCatalogName(),
-                               new 
GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), 
tableConfig.getBuiltInDatabaseName())
+                               defaultCatalog,
+                               new GenericInMemoryCatalog(defaultCatalog, 
"default_database")
                        );
                        return (BatchTableEnvironment) 
con.newInstance(executionEnvironment, tableConfig, catalogManager);
                } catch (Throwable t) {
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index a4f5df2..6859780 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -35,24 +36,115 @@ import 
org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 
 /**
- * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} 
that works with
- * {@link DataStream}s.
+ * This table environment is the entry point and central context for creating 
Table & SQL
+ * API programs that integrate with the Java-specific {@link DataStream} API.
  *
- * <p>A TableEnvironment can be used to:
+ * <p>It is unified for bounded and unbounded data processing.
+ *
+ * <p>A stream table environment is responsible for:
  * <ul>
- *     <li>convert a {@link DataStream} to a {@link Table}</li>
- *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s 
catalog</li>
- *     <li>register a {@link Table} in the {@link TableEnvironment}'s 
catalog</li>
- *     <li>scan a registered table to obtain a {@link Table}</li>
- *     <li>specify a SQL query on registered tables to obtain a {@link 
Table}</li>
- *     <li>convert a {@link Table} into a {@link DataStream}</li>
- *     <li>explain the AST and execution plan of a {@link Table}</li>
+ *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects 
from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
+ *
+ * <p>Note: If you don't intend to use the {@link DataStream} API, {@link 
TableEnvironment} is meant
+ * for pure table programs.
  */
 @PublicEvolving
 public interface StreamTableEnvironment extends TableEnvironment {
 
        /**
+        * Creates a table environment that is the entry point and central 
context for creating Table & SQL
+        * API programs that integrate with the Java-specific {@link 
DataStream} API.
+        *
+        * <p>It is unified for bounded and unbounded data processing.
+        *
+        * <p>A stream table environment is responsible for:
+        * <ul>
+        *     <li>Convert a {@link DataStream} into {@link Table} and 
vice-versa.</li>
+        *     <li>Connecting to external systems.</li>
+        *     <li>Registering and retrieving {@link Table}s and other meta 
objects from a catalog.</li>
+        *     <li>Executing SQL statements.</li>
+        *     <li>Offering further configuration options.</li>
+        * </ul>
+        *
+        * <p>Note: If you don't intend to use the {@link DataStream} API, 
{@link TableEnvironment} is meant
+        * for pure table programs.
+        *
+        * @param executionEnvironment The Java {@link 
StreamExecutionEnvironment} of the {@link TableEnvironment}.
+        */
+       static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment) {
+               return create(
+                       executionEnvironment,
+                       EnvironmentSettings.newInstance().build());
+       }
+
+       /**
+        * Creates a table environment that is the entry point and central 
context for creating Table & SQL
+        * API programs that integrate with the Java-specific {@link 
DataStream} API.
+        *
+        * <p>It is unified for bounded and unbounded data processing.
+        *
+        * <p>A stream table environment is responsible for:
+        * <ul>
+        *     <li>Convert a {@link DataStream} into {@link Table} and 
vice-versa.</li>
+        *     <li>Connecting to external systems.</li>
+        *     <li>Registering and retrieving {@link Table}s and other meta 
objects from a catalog.</li>
+        *     <li>Executing SQL statements.</li>
+        *     <li>Offering further configuration options.</li>
+        * </ul>
+        *
+        * <p>Note: If you don't intend to use the {@link DataStream} API, 
{@link TableEnvironment} is meant
+        * for pure table programs.
+        *
+        * @param executionEnvironment The Java {@link 
StreamExecutionEnvironment} of the {@link TableEnvironment}.
+        * @param settings The environment settings used to instantiate the 
{@link TableEnvironment}.
+        */
+       static StreamTableEnvironment create(
+                       StreamExecutionEnvironment executionEnvironment,
+                       EnvironmentSettings settings) {
+               return StreamTableEnvironmentImpl.create(
+                       executionEnvironment,
+                       settings,
+                       new TableConfig()
+               );
+       }
+
+       /**
+        * Creates a table environment that is the entry point and central 
context for creating Table & SQL
+        * API programs that integrate with the Java-specific {@link 
DataStream} API.
+        *
+        * <p>It is unified for bounded and unbounded data processing.
+        *
+        * <p>A stream table environment is responsible for:
+        * <ul>
+        *     <li>Convert a {@link DataStream} into {@link Table} and 
vice-versa.</li>
+        *     <li>Connecting to external systems.</li>
+        *     <li>Registering and retrieving {@link Table}s and other meta 
objects from a catalog.</li>
+        *     <li>Executing SQL statements.</li>
+        *     <li>Offering further configuration options.</li>
+        * </ul>
+        *
+        * <p>Note: If you don't intend to use the {@link DataStream} API, 
{@link TableEnvironment} is meant
+        * for pure table programs.
+        *
+        * @param executionEnvironment The Java {@link 
StreamExecutionEnvironment} of the {@link TableEnvironment}.
+        * @param tableConfig The configuration of the {@link TableEnvironment}.
+        * @deprecated Use {@link #create(StreamExecutionEnvironment)} and 
{@link #getConfig()}
+        * for manipulating {@link TableConfig}.
+        */
+       @Deprecated
+       static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment, TableConfig tableConfig) {
+               return StreamTableEnvironmentImpl.create(
+                       executionEnvironment,
+                       EnvironmentSettings.newInstance().build(),
+                       tableConfig);
+       }
+
+       /**
         * Registers a {@link TableFunction} under a unique name in the 
TableEnvironment's catalog.
         * Registered functions can be referenced in Table API and SQL queries.
         *
@@ -356,47 +448,4 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
         */
        @Override
        StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
-
-       /**
-        * The {@link TableEnvironment} for a Java {@link 
StreamExecutionEnvironment} that works with
-        * {@link DataStream}s.
-        *
-        * <p>A TableEnvironment can be used to:
-        * <ul>
-        *     <li>convert a {@link DataStream} to a {@link Table}</li>
-        *     <li>register a {@link DataStream} in the {@link 
TableEnvironment}'s catalog</li>
-        *     <li>register a {@link Table} in the {@link TableEnvironment}'s 
catalog</li>
-        *     <li>scan a registered table to obtain a {@link Table}</li>
-        *     <li>specify a SQL query on registered tables to obtain a {@link 
Table}</li>
-        *     <li>convert a {@link Table} into a {@link DataStream}</li>
-        *     <li>explain the AST and execution plan of a {@link Table}</li>
-        * </ul>
-        *
-        * @param executionEnvironment The Java {@link 
StreamExecutionEnvironment} of the TableEnvironment.
-        */
-       static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment) {
-               return create(executionEnvironment, new TableConfig());
-       }
-
-       /**
-        * The {@link TableEnvironment} for a Java {@link 
StreamExecutionEnvironment} that works with
-        * {@link DataStream}s.
-        *
-        * <p>A TableEnvironment can be used to:
-        * <ul>
-        *     <li>convert a {@link DataStream} to a {@link Table}</li>
-        *     <li>register a {@link DataStream} in the {@link 
TableEnvironment}'s catalog</li>
-        *     <li>register a {@link Table} in the {@link TableEnvironment}'s 
catalog</li>
-        *     <li>scan a registered table to obtain a {@link Table}</li>
-        *     <li>specify a SQL query on registered tables to obtain a {@link 
Table}</li>
-        *     <li>convert a {@link Table} into a {@link DataStream}</li>
-        *     <li>explain the AST and execution plan of a {@link Table}</li>
-        * </ul>
-        *
-        * @param executionEnvironment The Java {@link 
StreamExecutionEnvironment} of the TableEnvironment.
-        * @param tableConfig The configuration of the TableEnvironment.
-        */
-       static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment, TableConfig tableConfig) {
-               return StreamTableEnvironmentImpl.create(tableConfig, 
executionEnvironment);
-       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 0ea02a3..6b37690 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,24 +28,27 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.internal.PlannerFactory;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
@@ -60,6 +64,7 @@ import org.apache.flink.table.typeutils.FieldInfoUtils;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -73,7 +78,8 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
 
        private final StreamExecutionEnvironment executionEnvironment;
 
-       private StreamTableEnvironmentImpl(
+       @VisibleForTesting
+       public StreamTableEnvironmentImpl(
                        CatalogManager catalogManager,
                        FunctionCatalog functionCatalog,
                        TableConfig tableConfig,
@@ -92,33 +98,21 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
         * @param tableConfig The configuration of the TableEnvironment.
         * @param executionEnvironment The {@link StreamExecutionEnvironment} 
of the TableEnvironment.
         */
-       public static StreamTableEnvironmentImpl create(
-                       TableConfig tableConfig,
-                       StreamExecutionEnvironment executionEnvironment) {
-               CatalogManager catalogManager = new CatalogManager(
-                       tableConfig.getBuiltInCatalogName(),
-                       new 
GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), 
tableConfig.getBuiltInDatabaseName()));
-               return create(catalogManager, tableConfig, 
executionEnvironment);
-       }
-
-       /**
-        * Creates an instance of a {@link StreamTableEnvironment}. It uses the 
{@link StreamExecutionEnvironment} for
-        * executing queries. This is also the {@link 
StreamExecutionEnvironment} that will be used when converting
-        * from/to {@link DataStream}.
-        *
-        * @param catalogManager The {@link CatalogManager} to use for storing 
and looking up {@link Table}s.
-        * @param tableConfig The configuration of the TableEnvironment.
-        * @param executionEnvironment The {@link StreamExecutionEnvironment} 
of the TableEnvironment.
-        */
-       public static StreamTableEnvironmentImpl create(
-                       CatalogManager catalogManager,
-                       TableConfig tableConfig,
-                       StreamExecutionEnvironment executionEnvironment) {
+       public static StreamTableEnvironment create(
+                       StreamExecutionEnvironment executionEnvironment,
+                       EnvironmentSettings settings,
+                       TableConfig tableConfig) {
                FunctionCatalog functionCatalog = new FunctionCatalog(
-                       catalogManager.getCurrentCatalog(),
-                       catalogManager.getCurrentDatabase());
-               Executor executor = lookupExecutor(executionEnvironment);
-               Planner planner = PlannerFactory.lookupPlanner(executor, 
tableConfig, functionCatalog, catalogManager);
+                       settings.getBuiltInCatalogName(),
+                       settings.getBuiltInDatabaseName());
+               CatalogManager catalogManager = new CatalogManager(
+                       settings.getBuiltInCatalogName(),
+                       new 
GenericInMemoryCatalog(settings.getBuiltInCatalogName(), 
settings.getBuiltInDatabaseName()));
+               Map<String, String> plannerProperties = 
settings.toPlannerProperties();
+               Map<String, String> executorProperties = 
settings.toExecutorProperties();
+               Executor executor = lookupExecutor(executorProperties, 
executionEnvironment);
+               Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+                       .create(plannerProperties, executor, tableConfig, 
functionCatalog, catalogManager);
                return new StreamTableEnvironmentImpl(
                        catalogManager,
                        functionCatalog,
@@ -129,12 +123,18 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl imple
                );
        }
 
-       private static Executor lookupExecutor(StreamExecutionEnvironment 
executionEnvironment) {
+       private static Executor lookupExecutor(
+                       Map<String, String> executorProperties,
+                       StreamExecutionEnvironment executionEnvironment) {
                try {
-                       Class<?> clazz = 
Class.forName("org.apache.flink.table.executor.ExecutorFactory");
-                       Method createMethod = clazz.getMethod("create", 
StreamExecutionEnvironment.class);
-
-                       return (Executor) createMethod.invoke(null, 
executionEnvironment);
+                       ExecutorFactory executorFactory = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+                       Method createMethod = executorFactory.getClass()
+                               .getMethod("create", Map.class, 
StreamExecutionEnvironment.class);
+
+                       return (Executor) createMethod.invoke(
+                               executorFactory,
+                               executorProperties,
+                               executionEnvironment);
                } catch (Exception e) {
                        throw new TableException(
                                "Could not instantiate the executor. Make sure 
a planner module is on the classpath",
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
new file mode 100644
index 0000000..37ba179
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines all parameters that initialize a table environment. Those 
parameters are used only
+ * during instantiation of a {@link TableEnvironment} and cannot be changed 
afterwards.
+ *
+ * <p>Example:
+ * <pre>{@code
+ *    EnvironmentSettings.newInstance()
+ *      .useOldPlanner()
+ *      .inStreamingMode()
+ *      .withBuiltInCatalogName("default_catalog")
+ *      .withBuiltInDatabaseName("default_database")
+ *      .build()
+ * }</pre>
+ */
+@PublicEvolving
+public class EnvironmentSettings {
+       public static final String BATCH_MODE = "batch-mode";
+       public static final String CLASS_NAME = "class-name";
+
+       /**
+        * Canonical name of the {@link Planner} class to use.
+        */
+       private final String plannerClass;
+
+       /**
+        * Canonical name of the {@link Executor} class to use.
+        */
+       private final String executorClass;
+
+       /**
+        * Specifies the name of the initial catalog to be created when 
instantiating
+        * {@link TableEnvironment}.
+        */
+       private final String builtInCatalogName;
+
+       /**
+        * Specifies the name of the default database in the initial catalog to 
be created when
+        * instantiating {@link TableEnvironment}.
+        */
+       private final String builtInDatabaseName;
+
+       /**
+        * Determines if the table environment should work in a batch ({@code 
true}) or
+        * streaming ({@code false}) mode.
+        */
+       private final boolean isBatchMode;
+
+       private EnvironmentSettings(
+                       @Nullable String plannerClass,
+                       @Nullable String executorClass,
+                       String builtInCatalogName,
+                       String builtInDatabaseName,
+                       boolean isBatchMode) {
+               this.plannerClass = plannerClass;
+               this.executorClass = executorClass;
+               this.builtInCatalogName = builtInCatalogName;
+               this.builtInDatabaseName = builtInDatabaseName;
+               this.isBatchMode = isBatchMode;
+       }
+
+       /**
+        * Creates a builder for creating an instance of {@link 
EnvironmentSettings}.
+        *
+        * <p>By default, it does not specify a required planner and will use 
the one that is available
+        * on the classpath via discovery.
+        */
+       public static Builder newInstance() {
+               return new Builder();
+       }
+
+       /**
+        * Gets the specified name of the initial catalog to be created when 
instantiating
+        * a {@link TableEnvironment}.
+        */
+       public String getBuiltInCatalogName() {
+               return builtInCatalogName;
+       }
+
+       /**
+        * Gets the specified name of the default database in the initial 
catalog to be created when instantiating
+        * a {@link TableEnvironment}.
+        */
+       public String getBuiltInDatabaseName() {
+               return builtInDatabaseName;
+       }
+
+       @Internal
+       public Map<String, String> toPlannerProperties() {
+               Map<String, String> properties = new 
HashMap<>(toCommonProperties());
+               if (plannerClass != null) {
+                       properties.put(CLASS_NAME, plannerClass);
+               }
+               return properties;
+       }
+
+       @Internal
+       public Map<String, String> toExecutorProperties() {
+               Map<String, String> properties = new 
HashMap<>(toCommonProperties());
+               if (executorClass != null) {
+                       properties.put(CLASS_NAME, executorClass);
+               }
+               return properties;
+       }
+
+       private Map<String, String> toCommonProperties() {
+               Map<String, String> properties = new HashMap<>();
+               properties.put(BATCH_MODE, Boolean.toString(isBatchMode));
+               return properties;
+       }
+
+       /**
+        * A builder for {@link EnvironmentSettings}.
+        */
+       public static class Builder {
+               private String plannerClass = null;
+               private String executorClass = null;
+               private String builtInCatalogName = "default_catalog";
+               private String builtInDatabaseName = "default_database";
+               private boolean isBatchMode = false;
+
+               /**
+                * Sets the old Flink planner as the required module. By 
default, {@link #useAnyPlanner()} is
+                * enabled.
+                */
+               public Builder useOldPlanner() {
+                       this.plannerClass = 
"org.apache.flink.table.planner.StreamPlannerFactory";
+                       this.executorClass = 
"org.apache.flink.table.executor.StreamExecutorFactory";
+                       return this;
+               }
+
+               /**
+                * Sets the Blink planner as the required module. By default, 
{@link #useAnyPlanner()} is
+                * enabled.
+                */
+               public Builder useBlinkPlanner() {
+                       throw new UnsupportedOperationException("The Blink 
planner is not supported yet.");
+               }
+
+               /**
+                * Does not set a planner requirement explicitly.
+                *
+                * <p>A planner will be discovered automatically, if there is 
only one planner available.
+                *
+                * <p>This is the default behavior.
+                */
+               public Builder useAnyPlanner() {
+                       this.plannerClass = null;
+                       this.executorClass = null;
+                       return this;
+               }
+
+               /**
+                * Sets that the components should work in a batch mode. 
Streaming mode by default.
+                */
+               public Builder inBatchMode() {
+                       this.isBatchMode = true;
+                       return this;
+               }
+
+               /**
+                * Sets that the components should work in a streaming mode. 
Enabled by default.
+                */
+               public Builder inStreamingMode() {
+                       this.isBatchMode = false;
+                       return this;
+               }
+
+               /**
+                * Specifies the name of the initial catalog to be created when 
instantiating
+                * a {@link TableEnvironment}. Default: "default_catalog".
+                */
+               public Builder withBuiltInCatalogName(String 
builtInCatalogName) {
+                       this.builtInCatalogName = builtInCatalogName;
+                       return this;
+               }
+
+               /**
+                * Specifies the name of the default database in the initial 
catalog to be created when instantiating
+                * a {@link TableEnvironment}. Default: "default_database".
+                */
+               public Builder withBuiltInDatabaseName(String 
builtInDatabaseName) {
+                       this.builtInDatabaseName = builtInDatabaseName;
+                       return this;
+               }
+
+               /**
+                * Returns an immutable instance of {@link EnvironmentSettings}.
+                */
+               public EnvironmentSettings build() {
+                       return new EnvironmentSettings(
+                               plannerClass,
+                               executorClass,
+                               builtInCatalogName,
+                               builtInDatabaseName,
+                               isBatchMode);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 8e5ba8a..325732f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -59,18 +59,6 @@ public class TableConfig {
        private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
        /**
-        * Specifies the name of the initial catalog to be created when 
instantiating
-        * TableEnvironment.
-        */
-       private String builtInCatalogName = "default_catalog";
-
-       /**
-        * Specifies the name of the default database in the initial catalog to 
be created when instantiating
-        * TableEnvironment.
-        */
-       private String builtInDatabaseName = "default_database";
-
-       /**
         * Returns the timezone for date/time/timestamp conversions.
         */
        public TimeZone getTimeZone() {
@@ -147,38 +135,6 @@ public class TableConfig {
                this.maxGeneratedCodeLength = 
Preconditions.checkNotNull(maxGeneratedCodeLength);
        }
 
-       /**
-        * Gets the specified name of the initial catalog to be created when 
instantiating
-        * a {@link TableEnvironment}.
-        */
-       public String getBuiltInCatalogName() {
-               return builtInCatalogName;
-       }
-
-       /**
-        * Specifies the name of the initial catalog to be created when 
instantiating
-        * a {@link TableEnvironment}. This method has no effect if called on 
the {@link TableEnvironment#getConfig()}.
-        */
-       public void setBuiltInCatalogName(String builtInCatalogName) {
-               this.builtInCatalogName = builtInCatalogName;
-       }
-
-       /**
-        * Gets the specified name of the default database in the initial 
catalog to be created when instantiating
-        * a {@link TableEnvironment}.
-        */
-       public String getBuiltInDatabaseName() {
-               return builtInDatabaseName;
-       }
-
-       /**
-        * Specifies the name of the default database in the initial catalog to 
be created when instantiating
-        * a {@link TableEnvironment}. This method has no effect if called on 
the {@link TableEnvironment#getConfig()}.
-        */
-       public void setBuiltInDatabaseName(String builtInDatabaseName) {
-               this.builtInDatabaseName = builtInDatabaseName;
-       }
-
        public static TableConfig getDefault() {
                return new TableConfig();
        }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index b2b50a0..85d20f6 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -31,21 +31,23 @@ import org.apache.flink.table.sources.TableSource;
 import java.util.Optional;
 
 /**
- * The base class for batch and stream TableEnvironments.
+ * A table environment is the base class, entry point, and central context for 
creating Table & SQL
+ * API programs.
  *
- * <p>The TableEnvironment is a central concept of the Table API and SQL 
integration. It is
- * responsible for:
+ * <p>It is unified both on a language level for all JVM-based languages (i.e. 
there is no distinction
+ * between Scala and Java API) and for bounded and unbounded data processing.
  *
+ * <p>A table environment is responsible for:
  * <ul>
- *     <li>Registering a Table in the internal catalog</li>
- *     <li>Registering an external catalog</li>
- *     <li>Executing SQL queries</li>
- *     <li>Registering a user-defined scalar function. For the user-defined 
table and aggregate
- *     function, use the StreamTableEnvironment or BatchTableEnvironment</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects 
from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
  *
- * <p>This environment is unified both on a language level (for all JVM-based 
languages, i.e. no distinction between
- * Scala and Java API) and for bounded and unbounded data processing.
+ * <p>Note: This environment is meant for pure table programs. If you would 
like to convert from or to
+ * other Flink APIs, it might be necessary to use one of the available 
language-specific table environments
+ * in the corresponding bridging modules.
  */
 @PublicEvolving
 public interface TableEnvironment {
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 40849aa..727727a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -98,8 +98,8 @@ public class TableEnvironmentImpl implements TableEnvironment 
{
 
                this.tableConfig = tableConfig;
                this.tableConfig.addPlannerConfig(queryConfigProvider);
-               this.defaultCatalogName = tableConfig.getBuiltInCatalogName();
-               this.defaultDatabaseName = tableConfig.getBuiltInDatabaseName();
+               this.defaultCatalogName = catalogManager.getCurrentCatalog();
+               this.defaultDatabaseName = catalogManager.getCurrentDatabase();
 
                this.functionCatalog = functionCatalog;
                this.planner = planner;
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
new file mode 100644
index 0000000..7de03c8
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.factories.ComponentFactory;
+
+import java.util.Map;
+
+/**
+ * Factory that creates {@link Executor}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for 
discovering. A factory is
+ * called with a set of normalized properties that describe the desired 
configuration. Those properties
+ * may include execution configurations such as watermark interval, max 
parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in 
batch mode.
+ *
+ * <p><b>Important:</b> The implementations of this interface should also 
implement method
+ * <pre>{@code public Executor create(Map<String, String> properties, 
StreamExecutionEnvironment executionEnvironment);}
+ * </pre> This method will be used when instantiating a {@link 
org.apache.flink.table.api.TableEnvironment} from a
+ * bridging module which enables conversion from/to {@code DataStream} API and 
requires a pre configured
+ * {@code StreamTableEnvironment}.
+ */
+@Internal
+public interface ExecutorFactory extends ComponentFactory {
+
+       /**
+        * Creates a corresponding {@link Executor}.
+        *
+        * @param properties Static properties of the {@link Executor}, the 
same that were used for factory lookup.
+        * @return instance of a {@link Executor}
+        */
+       Executor create(Map<String, String> properties);
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
similarity index 55%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
index fa1128e..0df52d4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
@@ -16,54 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api.internal;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactory;
 
-import java.lang.reflect.Constructor;
+import java.util.Map;
 
 /**
- * Factory to construct a {@link Planner}. It will look for the planner on the 
classpath.
+ * Factory that creates {@link Planner}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for 
discovering. A factory is
+ * called with a set of normalized properties that describe the desired 
configuration. Those properties
+ * may include execution configurations such as watermark interval, max 
parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in 
batch mode.
  */
 @Internal
-public final class PlannerFactory {
+public interface PlannerFactory extends ComponentFactory {
 
        /**
-        * Looks up {@link Planner} on the class path via reflection.
+        * Creates a corresponding {@link Planner}.
         *
+        * @param properties Static properties of the {@link Planner}, the same 
that were used for factory lookup.
         * @param executor The executor required by the planner.
         * @param tableConfig The configuration of the planner to use.
         * @param functionCatalog The function catalog to look up user defined 
functions.
         * @param catalogManager The catalog manager to look up tables and 
views.
         * @return instance of a {@link Planner}
         */
-       public static Planner lookupPlanner(
-                       Executor executor,
-                       TableConfig tableConfig,
-                       FunctionCatalog functionCatalog,
-                       CatalogManager catalogManager) {
-               try {
-                       Class<?> clazz = 
Class.forName("org.apache.flink.table.planner.StreamPlanner");
-                       Constructor con = clazz.getConstructor(
-                               Executor.class,
-                               TableConfig.class,
-                               FunctionCatalog.class,
-                               CatalogManager.class);
-
-                       return (Planner) con.newInstance(executor, tableConfig, 
functionCatalog, catalogManager);
-               } catch (Exception e) {
-                       throw new TableException(
-                               "Could not instantiate the planner. Make sure 
the planner module is on the classpath",
-                               e);
-               }
-       }
-
-       private PlannerFactory() {
-       }
+       Planner create(
+               Map<String, String> properties,
+               Executor executor,
+               TableConfig tableConfig,
+               FunctionCatalog functionCatalog,
+               CatalogManager catalogManager);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
new file mode 100644
index 0000000..45e87235
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A factory interface for components that enables further disambiguating in 
case
+ * there are multiple matching implementations present.
+ */
+@PublicEvolving
+public interface ComponentFactory extends TableFactory {
+       /**
+        * Specifies a context of optional parameters that if exist should have 
the
+        * given values. This enables further disambiguating if there are 
multiple
+        * factories that meet the {@link #requiredContext()} and {@link 
#supportedProperties()}.
+        *
+        * <p><b>NOTE:</b> All the property keys should be included in {@link 
#supportedProperties()}.
+        *
+        * @return optional properties to disambiguate factories
+        */
+       Map<String, String> optionalContext();
+
+       @Override
+       Map<String, String> requiredContext();
+
+       /**
+        * {@inheritDoc}
+        *
+        * <p><b>NOTE:</b> All the property keys from {@link 
#optionalContext()} should also be included.
+        */
+       @Override
+       List<String> supportedProperties();
+}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
new file mode 100644
index 0000000..db8e9ba
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.AmbiguousTableFactoryException;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Unified class to search for a {@link ComponentFactory} of provided type and 
properties. It is similar to
+ * {@link TableFactoryService} but it disambiguates based on {@link 
ComponentFactory#optionalContext()}.
+ */
+@Internal
+public class ComponentFactoryService {
+
+       /**
+        * Finds a table factory of the given class and property map. This 
method enables
+        * disambiguating multiple matching {@link ComponentFactory}s based on 
additional
+        * optional context provided via {@link 
ComponentFactory#optionalContext()}.
+        *
+        * @param factoryClass desired factory class
+        * @param propertyMap properties that describe the factory configuration
+        * @param <T> factory class type
+        * @return the matching factory
+        */
+       public static <T extends ComponentFactory> T find(Class<T> 
factoryClass, Map<String, String> propertyMap) {
+               List<T> all = TableFactoryService.findAll(factoryClass, 
propertyMap);
+
+               List<T> filtered = all.stream().filter(factory -> {
+                       Map<String, String> optionalContext = 
factory.optionalContext();
+                       return 
optionalContext.entrySet().stream().allMatch(entry -> {
+                                       String property = 
propertyMap.get(entry.getKey());
+                                       if (property != null) {
+                                               return 
property.equals(entry.getValue());
+                                       } else {
+                                               return true;
+                                       }
+                               }
+                       );
+               }).collect(Collectors.toList());
+
+               if (filtered.size() > 1) {
+                       throw new AmbiguousTableFactoryException(
+                               filtered,
+                               factoryClass,
+                               new ArrayList<>(all),
+                               propertyMap
+                       );
+               } else if (filtered.isEmpty()) {
+                       throw new NoMatchingTableFactoryException(
+                               "No factory supports the additional filters.",
+                               factoryClass,
+                               new ArrayList<>(all),
+                               propertyMap);
+               } else {
+                       return filtered.get(0);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
new file mode 100644
index 0000000..13e821d
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.utils.TestPlannerFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ComponentFactoryService}.
+ */
+public class ComponentFactoryServiceTest {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       @Test
+       public void testLookingUpAmbiguousPlanners() {
+               Map<String, String> properties = new HashMap<>();
+               properties.put(EnvironmentSettings.CLASS_NAME, 
TestPlannerFactory.class.getCanonicalName());
+               properties.put(EnvironmentSettings.BATCH_MODE, 
Boolean.toString(true));
+               properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, 
TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+               PlannerFactory plannerFactory = 
ComponentFactoryService.find(PlannerFactory.class, properties);
+
+               assertThat(plannerFactory, 
instanceOf(TestPlannerFactory.class));
+       }
+
+       @Test
+       public void testLookingUpNonExistentClass() {
+               thrown.expect(NoMatchingTableFactoryException.class);
+               thrown.expectMessage("Reason: No factory supports the 
additional filters");
+
+               Map<String, String> properties = new HashMap<>();
+               properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass");
+               properties.put(EnvironmentSettings.BATCH_MODE, 
Boolean.toString(true));
+               properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, 
TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+               ComponentFactoryService.find(PlannerFactory.class, properties);
+       }
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
new file mode 100644
index 0000000..c4c3804
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactoryServiceTest;
+
+/**
+ * Test {@link Planner} factory used in {@link ComponentFactoryServiceTest}.
+ */
+public class OtherTestPlannerFactory extends TestPlannerFactory {
+}
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
new file mode 100644
index 0000000..1a0f05f
--- /dev/null
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test {@link Planner} factory used in {@link 
org.apache.flink.table.factories.ComponentFactoryServiceTest}.
+ */
+public class TestPlannerFactory implements PlannerFactory {
+
+       public static final String PLANNER_TYPE_KEY = "planner-type";
+       public static final String PLANNER_TYPE_VALUE = "test-planner";
+
+       @Override
+       public Planner create(
+               Map<String, String> properties, Executor executor,
+               TableConfig tableConfig,
+               FunctionCatalog functionCatalog,
+               CatalogManager catalogManager) {
+               return null;
+       }
+
+       @Override
+       public Map<String, String> optionalContext() {
+               HashMap<String, String> map = new HashMap<>();
+               map.put(EnvironmentSettings.CLASS_NAME, 
this.getClass().getCanonicalName());
+               return map;
+       }
+
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> map = new HashMap<>();
+               map.put(PLANNER_TYPE_KEY, PLANNER_TYPE_VALUE);
+               return map;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               return Arrays.asList(EnvironmentSettings.CLASS_NAME, 
EnvironmentSettings.BATCH_MODE);
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 77%
copy from 
flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to 
flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..e2a6a48 100644
--- 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,7 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.sources.CsvBatchTableSourceFactory
-org.apache.flink.table.sources.CsvAppendTableSourceFactory
-org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.factories.utils.TestPlannerFactory
+org.apache.flink.table.factories.utils.OtherTestPlannerFactory
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 7018af2..fa6b178 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -233,11 +233,12 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
+      val defaultCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
-        tableConfig.getBuiltInCatalogName,
+        "default_catalog",
         new GenericInMemoryCatalog(
-          tableConfig.getBuiltInCatalogName,
-          tableConfig.getBuiltInDatabaseName)
+          defaultCatalog,
+          "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
         .asInstanceOf[BatchTableEnvironment]
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 473522e..3baa5fa 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,28 +17,33 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{TableEnvironment, _}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction}
 
 /**
-  * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that 
works with
-  * [[DataStream]]s.
+  * This table environment is the entry point and central context for creating 
Table & SQL
+  * API programs that integrate with the Scala-specific [[DataStream]] API.
   *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  * - explain the AST and execution plan of a [[Table]]
+  * It is unified for bounded and unbounded data processing.
+  *
+  * A stream table environment is responsible for:
+  *
+  * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+  * - Connecting to external systems.
+  * - Registering and retrieving [[Table]]s and other meta objects from a 
catalog.
+  * - Executing SQL statements.
+  * - Offering further configuration options.
+  *
+  * Note: If you don't intend to use the [[DataStream]] API, 
[[TableEnvironment]] is meant for pure
+  * table programs.
   */
+@PublicEvolving
 trait StreamTableEnvironment extends TableEnvironment {
 
   /**
@@ -245,43 +250,90 @@ trait StreamTableEnvironment extends TableEnvironment {
 object StreamTableEnvironment {
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that 
works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of 
the TableEnvironment.
+    * Creates a table environment that is the entry point and central context 
for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] 
API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a 
catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, 
[[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of 
the
+    *                             [[TableEnvironment]].
     */
   def create(executionEnvironment: StreamExecutionEnvironment): 
StreamTableEnvironment = {
-    create(executionEnvironment, new TableConfig)
+    create(
+      executionEnvironment,
+      EnvironmentSettings.newInstance().build())
   }
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that 
works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of 
the TableEnvironment.
-    * @param tableConfig The configuration of the TableEnvironment.
+    * Creates a table environment that is the entry point and central context 
for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] 
API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a 
catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, 
[[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of 
the
+    *                             [[TableEnvironment]].
+    * @param settings The environment settings used to instantiate the 
[[TableEnvironment]].
+    */
+  def create(
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings)
+    : StreamTableEnvironment = {
+    StreamTableEnvironmentImpl.create(executionEnvironment, settings, new 
TableConfig)
+  }
+
+  /**
+    * Creates a table environment that is the entry point and central context 
for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] 
API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a 
catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, 
[[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of 
the
+    *                             [[TableEnvironment]].
+    * @param tableConfig The configuration of the [[TableEnvironment]].
+    * @deprecated Use [[create(StreamExecutionEnvironment)]] and
+    *             [[StreamTableEnvironment#getConfig()]] for manipulating the 
[[TableConfig]].
     */
+  @deprecated
   def create(executionEnvironment: StreamExecutionEnvironment, tableConfig: 
TableConfig)
     : StreamTableEnvironment = {
 
-    StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment)
+    StreamTableEnvironmentImpl
+      .create(
+        executionEnvironment,
+        EnvironmentSettings.newInstance().build(),
+        tableConfig)
   }
 }
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 33b304d..96b7403 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.table.api.scala.internal
 
-import java.util
-import java.util.{Collections, List => JList}
-
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
@@ -29,18 +26,22 @@ import 
org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.{PlannerFactory, 
TableEnvironmentImpl}
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
-import org.apache.flink.table.delegation.{Executor, Planner}
+import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner, 
PlannerFactory}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, 
StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.factories.ComponentFactoryService
 import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
 import org.apache.flink.table.operations.{OutputConversionModifyOperation, 
ScalaDataStreamQueryOperation}
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 
+import java.util
+import java.util.{Collections, List => JList, Map => JMap}
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -234,61 +235,54 @@ class StreamTableEnvironmentImpl (
 
 object StreamTableEnvironmentImpl {
 
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the 
TableEnvironment.
-    */
   def create(
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings,
+      tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
-    val catalogManager = new CatalogManager(
-      tableConfig.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(
-        tableConfig.getBuiltInCatalogName,
-        tableConfig.getBuiltInDatabaseName)
-    )
-    create(catalogManager, tableConfig, executionEnvironment)
-  }
-
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param catalogManager The [[CatalogManager]] to use for storing and 
looking up [[Table]]s.
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the 
TableEnvironment.
-    */
-  def create(
-      catalogManager: CatalogManager,
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
-    : StreamTableEnvironmentImpl = {
-    val executor = lookupExecutor(executionEnvironment)
+    val executorProperties = settings.toExecutorProperties
+    val plannerProperties = settings.toPlannerProperties
+    val executor = lookupExecutor(executorProperties, executionEnvironment)
     val functionCatalog = new FunctionCatalog(
-      catalogManager.getCurrentCatalog,
-      catalogManager.getCurrentDatabase)
+      settings.getBuiltInCatalogName,
+      settings.getBuiltInDatabaseName)
+    val catalogManager = new CatalogManager(
+      settings.getBuiltInCatalogName,
+      new GenericInMemoryCatalog(settings.getBuiltInCatalogName, 
settings.getBuiltInDatabaseName))
+    val planner = ComponentFactoryService.find(classOf[PlannerFactory], 
plannerProperties)
+      .create(
+        plannerProperties,
+        executor,
+        tableConfig,
+        functionCatalog,
+        catalogManager)
     new StreamTableEnvironmentImpl(
       catalogManager,
       functionCatalog,
       tableConfig,
       executionEnvironment,
-      PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, 
catalogManager),
+      planner,
       executor)
   }
 
-  private def lookupExecutor(executionEnvironment: StreamExecutionEnvironment) 
=
+  private def lookupExecutor(
+      executorProperties: JMap[String, String],
+      executionEnvironment: StreamExecutionEnvironment)
+    :Executor =
     try {
-      val clazz = 
Class.forName("org.apache.flink.table.executor.ExecutorFactory")
-      val createMethod = clazz.getMethod("create", 
classOf[JStreamExecutionEnvironment])
-      createMethod.invoke(null, 
executionEnvironment.getWrappedStreamExecutionEnvironment)
+      val executorFactory = ComponentFactoryService
+        .find(classOf[ExecutorFactory], executorProperties)
+      val createMethod = executorFactory.getClass
+        .getMethod(
+          "create",
+          classOf[util.Map[String, String]],
+          classOf[JStreamExecutionEnvironment])
+
+      createMethod
+        .invoke(
+          executorFactory,
+          executorProperties,
+          executionEnvironment.getWrappedStreamExecutionEnvironment)
         .asInstanceOf[Executor]
     } catch {
       case e: Exception =>
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
index 5148839..426e09e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,8 @@ import java.util.List;
 public class StreamExecutor implements Executor {
        private final StreamExecutionEnvironment executionEnvironment;
 
-       StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
+       @VisibleForTesting
+       public StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
                this.executionEnvironment = executionEnvironment;
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
similarity index 51%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
index 5534217..dff5590 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
@@ -20,36 +20,59 @@ package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Factory to create an implementation of {@link Executor} to use in a
  * {@link org.apache.flink.table.api.TableEnvironment}. The {@link 
org.apache.flink.table.api.TableEnvironment}
- * should use {@link #create()} method that does not bind to any particular 
environment,
+ * should use {@link #create(Map)} method that does not bind to any particular 
environment,
  * whereas {@link org.apache.flink.table.api.scala.StreamTableEnvironment} 
should use
- * {@link #create(StreamExecutionEnvironment)} as it is always backed by some 
{@link StreamExecutionEnvironment}
+ * {@link #create(Map, StreamExecutionEnvironment)} as it is always backed by
+ * some {@link StreamExecutionEnvironment}
  */
 @Internal
-public class ExecutorFactory {
+public class StreamExecutorFactory implements ExecutorFactory {
+
        /**
-        * Creates a {@link StreamExecutor} that is backed by given {@link 
StreamExecutionEnvironment}.
+        * Creates a corresponding {@link StreamExecutor}.
         *
+        * @param properties Static properties of the {@link Executor}, the 
same that were used for factory lookup.
         * @param executionEnvironment a {@link StreamExecutionEnvironment} to 
use while executing Table programs.
-        * @return {@link StreamExecutor}
+        * @return instance of a {@link Executor}
         */
-       public static Executor create(StreamExecutionEnvironment 
executionEnvironment) {
+       public Executor create(Map<String, String> properties, 
StreamExecutionEnvironment executionEnvironment) {
                return new StreamExecutor(executionEnvironment);
        }
 
-       /**
-        * Creates a {@link StreamExecutor} that is backed by a default {@link 
StreamExecutionEnvironment}.
-        *
-        * @return {@link StreamExecutor}
-        */
-       public static Executor create() {
+       @Override
+       public Executor create(Map<String, String> properties) {
                return new 
StreamExecutor(StreamExecutionEnvironment.getExecutionEnvironment());
        }
 
-       private ExecutorFactory() {
+       @Override
+       public Map<String, String> requiredContext() {
+               DescriptorProperties properties = new DescriptorProperties();
+               properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+               return properties.asMap();
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               return 
Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+       }
+
+       @Override
+       public Map<String, String> optionalContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(EnvironmentSettings.CLASS_NAME, 
this.getClass().getCanonicalName());
+               return context;
        }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
new file mode 100644
index 0000000..4efb850
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory to construct a {@link StreamPlanner}.
+ */
+@Internal
+public final class StreamPlannerFactory implements PlannerFactory {
+
+       @Override
+       public Planner create(
+               Map<String, String> properties,
+               Executor executor,
+               TableConfig tableConfig,
+               FunctionCatalog functionCatalog,
+               CatalogManager catalogManager) {
+               return new StreamPlanner(executor, tableConfig, 
functionCatalog, catalogManager);
+       }
+
+       public Map<String, String> optionalContext() {
+               Map<String, String> map = new HashMap<>();
+               map.put(EnvironmentSettings.CLASS_NAME, 
this.getClass().getCanonicalName());
+               return map;
+       }
+
+       @Override
+       public Map<String, String> requiredContext() {
+               DescriptorProperties properties = new DescriptorProperties();
+
+               properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+               return properties.asMap();
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               return 
Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+       }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..6bd2ea3 100644
--- 
a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ 
b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -17,3 +17,5 @@ org.apache.flink.table.sources.CsvBatchTableSourceFactory
 org.apache.flink.table.sources.CsvAppendTableSourceFactory
 org.apache.flink.table.sinks.CsvBatchTableSinkFactory
 org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.planner.StreamPlannerFactory
+org.apache.flink.table.executor.StreamExecutorFactory
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2b51869..efa29ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -52,8 +52,8 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  protected val defaultCatalogName: String = config.getBuiltInCatalogName
-  protected val defaultDatabaseName: String = config.getBuiltInDatabaseName
+  protected val defaultCatalogName: String = catalogManager.getCurrentCatalog
+  protected val defaultDatabaseName: String = catalogManager.getCurrentDatabase
 
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index bdb2913..44dc4ef 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.stream
 
-import java.lang.{Integer => JInt, Long => JLong}
-
 import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.scala._
@@ -31,14 +29,19 @@ import 
org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => J
 import org.apache.flink.table.api.java.{StreamTableEnvironment => 
JStreamTableEnv}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.utils.StreamTestData
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{binaryNode, 
streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
+
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
 
+import java.lang.{Integer => JInt, Long => JLong}
+
 class StreamTableEnvironmentTest extends TableTestBase {
 
   @Test
@@ -200,12 +203,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val jStreamExecEnv = mock(classOf[JStreamExecEnv])
     
when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
     val config = new TableConfig
-    val jTEnv = JStreamTableEnvironmentImpl.create(
-      new CatalogManager(
-        config.getBuiltInCatalogName,
-        new GenericInMemoryCatalog(config.getBuiltInCatalogName, 
config.getBuiltInDatabaseName)),
+    val manager: CatalogManager = new CatalogManager(
+      "default_catalog",
+      new GenericInMemoryCatalog("default_catalog", "default_database"))
+    val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
+    val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, 
manager.getCurrentDatabase)
+    val streamPlanner = new StreamPlanner(executor, config, functionCatalog, 
manager)
+    val jTEnv = new JStreamTableEnvironmentImpl(
+      manager,
+      functionCatalog,
       config,
-      jStreamExecEnv)
+      jStreamExecEnv,
+      streamPlanner,
+      executor)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, 
Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 2ca6ee3..e72225d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.utils
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
@@ -32,12 +30,16 @@ import 
org.apache.flink.table.api.java.internal.{BatchTableEnvironmentImpl => Ja
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl => 
ScalaBatchTableEnvironmentImpl, StreamTableEnvironmentImpl => 
ScalaStreamTableEnvironmentImpl}
 import org.apache.flink.table.api.{Table, TableConfig, TableSchema}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, 
GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction}
 import org.apache.flink.table.operations.{DataSetQueryOperation, 
JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{ComparisonFailure, Rule}
@@ -137,25 +139,11 @@ object TableTestUtil {
 
   val ANY_SUBTREE = "%ANY_SUBTREE%"
 
-  /**
-    * Creates a [[CatalogManager]] with a builtin default catalog & database 
set to values
-    * specified in the [[TableConfig]].
-    */
-  def createCatalogManager(config: TableConfig): CatalogManager = {
+  def createCatalogManager(): CatalogManager = {
+    val defaultCatalog = "default_catalog"
     new CatalogManager(
-      config.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(config.getBuiltInCatalogName, 
config.getBuiltInDatabaseName))
-  }
-
-  /**
-    * Sets the configuration of the builtin catalog & databases in 
[[TableConfig]]
-    * to the current catalog & database of the given [[CatalogManager]]. This 
should be used
-    * to ensure sanity of a [[org.apache.flink.table.api.TableEnvironment]].
-    */
-  def extractBuiltinPath(config: TableConfig, catalogManager: CatalogManager): 
TableConfig = {
-    config.setBuiltInCatalogName(catalogManager.getCurrentCatalog)
-    config.setBuiltInDatabaseName(catalogManager.getCurrentDatabase)
-    config
+      defaultCatalog,
+      new GenericInMemoryCatalog(defaultCatalog, "default_database"))
   }
 
   private[utils] def toRelNode(expected: Table) = {
@@ -239,22 +227,15 @@ case class BatchTableTestUtil(
   extends TableTestUtil {
   val javaEnv = new LocalEnvironment()
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
-
   val javaTableEnv = new JavaBatchTableEnvironmentImpl(
     javaEnv,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
   val env = new ExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaBatchTableEnvironmentImpl(
     env,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
 
   def addTable[T: TypeInformation](
       name: String,
@@ -344,22 +325,29 @@ case class StreamTableTestUtil(
   val javaEnv = new LocalStreamEnvironment()
   javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
+  private val tableConfig = new TableConfig
+  private val manager: CatalogManager = 
catalogManager.getOrElse(createCatalogManager())
+  private val executor: StreamExecutor = new StreamExecutor(javaEnv)
+  private val functionCatalog =
+    new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+  private val streamPlanner = new StreamPlanner(executor, tableConfig, 
functionCatalog, manager)
 
-  val javaTableEnv = JavaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val javaTableEnv = new JavaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    javaEnv)
+    javaEnv,
+    streamPlanner,
+    executor)
+
   val env = new StreamExecutionEnvironment(javaEnv)
-  val tableEnv = ScalaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val tableEnv = new ScalaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    env)
+    env,
+    streamPlanner,
+    executor)
 
   def addTable[T: TypeInformation](
       name: String,

Reply via email to