This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e4ae2ef  [FLINK-25229][table] Introduce flink-table-api-bridge-base
e4ae2ef is described below

commit e4ae2ef81e9ecbda10c4dcc5776584b07c2f5e6b
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Thu Dec 9 14:55:05 2021 +0100

    [FLINK-25229][table] Introduce flink-table-api-bridge-base
    
    This closes #18065.
---
 flink-connectors/flink-connector-hbase-1.4/pom.xml |   6 +
 flink-connectors/flink-connector-hbase-2.2/pom.xml |   6 +
 flink-connectors/flink-connector-hive/pom.xml      |   6 +
 flink-connectors/flink-connector-jdbc/pom.xml      |   6 +
 flink-connectors/flink-connector-kafka/pom.xml     |   6 +
 flink-formats/flink-avro/pom.xml                   |   6 +
 flink-formats/flink-csv/pom.xml                    |   6 +
 flink-formats/flink-json/pom.xml                   |   6 +
 flink-formats/flink-orc/pom.xml                    |   6 +
 flink-formats/flink-parquet/pom.xml                |   6 +
 flink-python/pom.xml                               |   6 +
 flink-table/flink-sql-client/pom.xml               |   6 +
 .../pom.xml                                        |  46 +--
 .../AbstractStreamTableEnvironmentImpl.java        | 329 +++++++++++++++++++++
 .../table/delegation/StreamExecutorFactory.java    |  37 +++
 .../operations/DataStreamQueryOperation.java}      |  25 +-
 .../table/operations/ExternalQueryOperation.java}  |   4 +-
 flink-table/flink-table-api-java-bridge/pom.xml    |   6 +
 .../java/internal/StreamTableEnvironmentImpl.java  | 270 +----------------
 .../operations/JavaDataStreamQueryOperation.java   | 116 --------
 .../table/api/internal/TableEnvironmentImpl.java   |   3 +-
 .../flink/table/delegation/ExecutorFactory.java    |  11 +-
 flink-table/flink-table-api-scala-bridge/pom.xml   |   6 +
 .../operations/ScalaExternalQueryOperation.java    | 121 --------
 .../internal/StreamTableEnvironmentImpl.scala      | 233 ++-------------
 flink-table/flink-table-planner/pom.xml            |  24 +-
 .../planner/delegation/DefaultExecutorFactory.java |   4 +-
 ....java => InternalDataStreamQueryOperation.java} |   8 +-
 .../planner/plan/QueryOperationConverter.java      |  44 +--
 .../flink/table/planner/utils/TableTestBase.scala  |  33 +--
 flink-table/flink-table-uber/pom.xml               |   6 +
 flink-table/pom.xml                                |   1 +
 tools/ci/stage.sh                                  |   2 +
 33 files changed, 549 insertions(+), 852 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase-1.4/pom.xml 
b/flink-connectors/flink-connector-hbase-1.4/pom.xml
index 99bac72..cd4b56c 100644
--- a/flink-connectors/flink-connector-hbase-1.4/pom.xml
+++ b/flink-connectors/flink-connector-hbase-1.4/pom.xml
@@ -135,6 +135,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-connectors/flink-connector-hbase-2.2/pom.xml 
b/flink-connectors/flink-connector-hbase-2.2/pom.xml
index ba86049..ed520ce 100644
--- a/flink-connectors/flink-connector-hbase-2.2/pom.xml
+++ b/flink-connectors/flink-connector-hbase-2.2/pom.xml
@@ -260,6 +260,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index 37f3dea..f807b12 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -531,6 +531,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-connectors/flink-connector-jdbc/pom.xml 
b/flink-connectors/flink-connector-jdbc/pom.xml
index 3ccecc6..4edd126 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -104,6 +104,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 49b6cbb..4089e09 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -187,6 +187,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index a6e51b3..d2e05ed 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -140,6 +140,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-formats/flink-csv/pom.xml b/flink-formats/flink-csv/pom.xml
index 244dbae..d2fbd47 100644
--- a/flink-formats/flink-csv/pom.xml
+++ b/flink-formats/flink-csv/pom.xml
@@ -104,6 +104,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index 0e60961..a042949 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -97,6 +97,12 @@ under the License.
                <!-- Json filesystem format factory ITCase test dependency -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index d30a21f..fad8c27 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -134,6 +134,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
diff --git a/flink-formats/flink-parquet/pom.xml 
b/flink-formats/flink-parquet/pom.xml
index 35bda88..06fdd81b 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -174,6 +174,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 53d0acd..7b68c29 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -70,6 +70,12 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
diff --git a/flink-table/flink-sql-client/pom.xml 
b/flink-table/flink-sql-client/pom.xml
index 44ec8ad..ba43157 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -172,6 +172,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
diff --git a/flink-table/flink-table-api-java-bridge/pom.xml 
b/flink-table/flink-table-api-bridge-base/pom.xml
similarity index 56%
copy from flink-table/flink-table-api-java-bridge/pom.xml
copy to flink-table/flink-table-api-bridge-base/pom.xml
index 1d4d64e..31628f0 100644
--- a/flink-table/flink-table-api-java-bridge/pom.xml
+++ b/flink-table/flink-table-api-bridge-base/pom.xml
@@ -16,7 +16,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
 
        <modelVersion>4.0.0</modelVersion>
 
@@ -27,11 +27,11 @@ under the License.
                <relativePath>..</relativePath>
        </parent>
 
-       <artifactId>flink-table-api-java-bridge</artifactId>
-       <name>Flink : Table : API Java bridge</name>
+       <artifactId>flink-table-api-bridge-base</artifactId>
+       <name>Flink : Table : API bridge base</name>
        <description>
-               This module contains the Table/SQL API for writing table 
programs
-               that interact with other Flink APIs using the Java programming 
language.
+               This module contains the base code for bridging between Table 
API and DataStream API for
+               both Java and Scala.
        </description>
 
        <packaging>jar</packaging>
@@ -39,11 +39,6 @@ under the License.
        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-api-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
@@ -52,41 +47,10 @@ under the License.
                        <artifactId>flink-streaming-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
-
-               <!-- test dependencies -->
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-api-java</artifactId>
                        <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-common</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-runtime</artifactId>
-                       <version>${project.version}</version>
-                       <type>test-jar</type>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils-junit</artifactId>
                </dependency>
        </dependencies>
 </project>
diff --git 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
new file mode 100644
index 0000000..79b4885
--- /dev/null
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.bridge.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
+import org.apache.flink.table.catalog.SchemaTranslator;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.StreamExecutorFactory;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.operations.DataStreamQueryOperation;
+import org.apache.flink.table.operations.ExternalModifyOperation;
+import org.apache.flink.table.operations.ExternalQueryOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.typeutils.FieldInfoUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Abstract class to implement a {@code StreamTableEnvironment}. */
+@Internal
+public abstract class AbstractStreamTableEnvironmentImpl extends 
TableEnvironmentImpl {
+
+    protected final StreamExecutionEnvironment executionEnvironment;
+
+    public AbstractStreamTableEnvironmentImpl(
+            CatalogManager catalogManager,
+            ModuleManager moduleManager,
+            TableConfig tableConfig,
+            Executor executor,
+            FunctionCatalog functionCatalog,
+            Planner planner,
+            boolean isStreamingMode,
+            ClassLoader userClassLoader,
+            StreamExecutionEnvironment executionEnvironment) {
+        super(
+                catalogManager,
+                moduleManager,
+                tableConfig,
+                executor,
+                functionCatalog,
+                planner,
+                isStreamingMode,
+                userClassLoader);
+        this.executionEnvironment = executionEnvironment;
+    }
+
+    public static Executor lookupExecutor(
+            ClassLoader classLoader,
+            String executorIdentifier,
+            StreamExecutionEnvironment executionEnvironment) {
+        final ExecutorFactory executorFactory;
+        try {
+            executorFactory =
+                    FactoryUtil.discoverFactory(
+                            classLoader, ExecutorFactory.class, 
executorIdentifier);
+        } catch (Exception e) {
+            throw new TableException(
+                    "Could not instantiate the executor. Make sure a planner 
module is on the classpath",
+                    e);
+        }
+        if (executorFactory instanceof StreamExecutorFactory) {
+            return ((StreamExecutorFactory) 
executorFactory).create(executionEnvironment);
+        } else {
+            throw new TableException(
+                    "The resolved ExecutorFactory '"
+                            + executorFactory.getClass()
+                            + "' doesn't implement StreamExecutorFactory.");
+        }
+    }
+
+    protected <T> Table fromStreamInternal(
+            DataStream<T> dataStream,
+            @Nullable Schema schema,
+            @Nullable String viewPath,
+            ChangelogMode changelogMode) {
+        Preconditions.checkNotNull(dataStream, "Data stream must not be 
null.");
+        Preconditions.checkNotNull(changelogMode, "Changelog mode must not be 
null.");
+
+        if (dataStream.getExecutionEnvironment() != executionEnvironment) {
+            throw new ValidationException(
+                    "The DataStream's StreamExecutionEnvironment must be 
identical to the one that "
+                            + "has been passed to the StreamTableEnvironment 
during instantiation.");
+        }
+
+        final CatalogManager catalogManager = getCatalogManager();
+        final SchemaResolver schemaResolver = 
catalogManager.getSchemaResolver();
+        final OperationTreeBuilder operationTreeBuilder = 
getOperationTreeBuilder();
+
+        final UnresolvedIdentifier unresolvedIdentifier;
+        if (viewPath != null) {
+            unresolvedIdentifier = getParser().parseIdentifier(viewPath);
+        } else {
+            unresolvedIdentifier =
+                    UnresolvedIdentifier.of("Unregistered_DataStream_Source_" 
+ dataStream.getId());
+        }
+        final ObjectIdentifier objectIdentifier =
+                catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        final SchemaTranslator.ConsumingResult schemaTranslationResult =
+                SchemaTranslator.createConsumingResult(
+                        catalogManager.getDataTypeFactory(), 
dataStream.getType(), schema);
+
+        final ResolvedSchema resolvedSchema =
+                schemaTranslationResult.getSchema().resolve(schemaResolver);
+
+        final QueryOperation scanOperation =
+                new ExternalQueryOperation<>(
+                        objectIdentifier,
+                        dataStream,
+                        schemaTranslationResult.getPhysicalDataType(),
+                        schemaTranslationResult.isTopLevelRecord(),
+                        changelogMode,
+                        resolvedSchema);
+
+        final List<String> projections = 
schemaTranslationResult.getProjections();
+        if (projections == null) {
+            return createTable(scanOperation);
+        }
+
+        final QueryOperation projectOperation =
+                operationTreeBuilder.project(
+                        projections.stream()
+                                .map(ApiExpressionUtils::unresolvedRef)
+                                .collect(Collectors.toList()),
+                        scanOperation);
+
+        return createTable(projectOperation);
+    }
+
+    protected <T> DataStream<T> toStreamInternal(
+            Table table,
+            SchemaTranslator.ProducingResult schemaTranslationResult,
+            @Nullable ChangelogMode changelogMode) {
+        final CatalogManager catalogManager = getCatalogManager();
+        final SchemaResolver schemaResolver = 
catalogManager.getSchemaResolver();
+        final OperationTreeBuilder operationTreeBuilder = 
getOperationTreeBuilder();
+
+        final QueryOperation projectOperation =
+                schemaTranslationResult
+                        .getProjections()
+                        .map(
+                                projections ->
+                                        operationTreeBuilder.project(
+                                                projections.stream()
+                                                        
.map(ApiExpressionUtils::unresolvedRef)
+                                                        
.collect(Collectors.toList()),
+                                                table.getQueryOperation()))
+                        .orElseGet(table::getQueryOperation);
+
+        final ResolvedSchema resolvedSchema =
+                schemaResolver.resolve(schemaTranslationResult.getSchema());
+
+        final UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(
+                        "Unregistered_DataStream_Sink_" + 
ExternalModifyOperation.getUniqueId());
+        final ObjectIdentifier objectIdentifier =
+                catalogManager.qualifyIdentifier(unresolvedIdentifier);
+
+        final ExternalModifyOperation modifyOperation =
+                new ExternalModifyOperation(
+                        objectIdentifier,
+                        projectOperation,
+                        resolvedSchema,
+                        changelogMode,
+                        schemaTranslationResult
+                                .getPhysicalDataType()
+                                
.orElseGet(resolvedSchema::toPhysicalRowDataType));
+
+        return toStreamInternal(table, modifyOperation);
+    }
+
+    protected <T> DataStream<T> toStreamInternal(Table table, ModifyOperation 
modifyOperation) {
+        final List<Transformation<?>> transformations =
+                planner.translate(Collections.singletonList(modifyOperation));
+
+        final Transformation<T> transformation = getTransformation(table, 
transformations);
+        executionEnvironment.addOperator(transformation);
+
+        // reconfigure whenever planner transformations are added
+        executionEnvironment.configure(tableConfig.getConfiguration());
+
+        return new DataStream<>(executionEnvironment, transformation);
+    }
+
+    /**
+     * This is a temporary workaround for Python API. Python API should not use
+     * StreamExecutionEnvironment at all.
+     */
+    @Internal
+    public StreamExecutionEnvironment execEnv() {
+        return executionEnvironment;
+    }
+
+    protected <T> TypeInformation<T> extractTypeInformation(Table table, 
Class<T> clazz) {
+        try {
+            return TypeExtractor.createTypeInfo(clazz);
+        } catch (Exception ex) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not convert query: %s to a DataStream of 
class %s",
+                            table.getQueryOperation().asSummaryString(), 
clazz.getSimpleName()),
+                    ex);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> Transformation<T> getTransformation(
+            Table table, List<Transformation<?>> transformations) {
+        if (transformations.size() != 1) {
+            throw new TableException(
+                    String.format(
+                            "Expected a single transformation for query: %s\n 
Got: %s",
+                            table.getQueryOperation().asSummaryString(), 
transformations));
+        }
+
+        return (Transformation<T>) transformations.get(0);
+    }
+
+    protected <T> DataType wrapWithChangeFlag(TypeInformation<T> outputType) {
+        TupleTypeInfo tupleTypeInfo =
+                new TupleTypeInfo<Tuple2<Boolean, T>>(Types.BOOLEAN(), 
outputType);
+        return TypeConversions.fromLegacyInfoToDataType(tupleTypeInfo);
+    }
+
+    protected <T> DataStreamQueryOperation<T> asQueryOperation(
+            DataStream<T> dataStream, Optional<List<Expression>> fields) {
+        TypeInformation<T> streamType = dataStream.getType();
+
+        // get field names and types for all non-replaced fields
+        FieldInfoUtils.TypeInfoSchema typeInfoSchema =
+                fields.map(
+                                f -> {
+                                    FieldInfoUtils.TypeInfoSchema fieldsInfo =
+                                            FieldInfoUtils.getFieldsInfo(
+                                                    streamType, f.toArray(new 
Expression[0]));
+
+                                    // check if event-time is enabled
+                                    
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
+                                    return fieldsInfo;
+                                })
+                        .orElseGet(() -> 
FieldInfoUtils.getFieldsInfo(streamType));
+
+        return new DataStreamQueryOperation<>(
+                dataStream, typeInfoSchema.getIndices(), 
typeInfoSchema.toResolvedSchema());
+    }
+
+    protected void validateTimeCharacteristic(boolean isRowtimeDefined) {
+        if (isRowtimeDefined
+                && executionEnvironment.getStreamTimeCharacteristic()
+                        != TimeCharacteristic.EventTime) {
+            throw new ValidationException(
+                    String.format(
+                            "A rowtime attribute requires an EventTime time 
characteristic in stream environment. But is: %s",
+                            
executionEnvironment.getStreamTimeCharacteristic()));
+        }
+    }
+
+    @Override
+    protected QueryOperation qualifyQueryOperation(
+            ObjectIdentifier identifier, QueryOperation queryOperation) {
+        if (queryOperation instanceof DataStreamQueryOperation) {
+            DataStreamQueryOperation<?> operation = (DataStreamQueryOperation) 
queryOperation;
+            return new DataStreamQueryOperation<>(
+                    identifier,
+                    operation.getDataStream(),
+                    operation.getFieldIndices(),
+                    operation.getResolvedSchema());
+        } else {
+            return queryOperation;
+        }
+    }
+
+    public void attachAsDataStream(List<ModifyOperation> modifyOperations) {
+        final List<Transformation<?>> transformations = 
translate(modifyOperations);
+        transformations.forEach(executionEnvironment::addOperator);
+    }
+}
diff --git 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java
new file mode 100644
index 0000000..ab44871
--- /dev/null
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/delegation/StreamExecutorFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+
+/** Sub interface of {@link ExecutorFactory} to support {@link DataStream} 
API. */
+@Internal
+public interface StreamExecutorFactory extends ExecutorFactory {
+
+    /**
+     * Creates a corresponding {@link Executor}.
+     *
+     * <p>This method will be used when instantiating a {@link 
TableEnvironment} from one of the
+     * bridging modules which enables conversion from/to {@link DataStream} 
API.
+     */
+    Executor create(StreamExecutionEnvironment streamExecutionEnvironment);
+}
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
similarity index 86%
rename from 
flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java
rename to 
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
index 4c375e2..ae0ea36 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaDataStreamQueryOperation.java
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/DataStreamQueryOperation.java
@@ -37,17 +37,18 @@ import java.util.Optional;
  *
  * <p>This operation may expose only part, or change the order of the fields 
available in a {@link
  * org.apache.flink.api.common.typeutils.CompositeType} of the underlying 
{@link DataStream}. The
- * {@link ScalaDataStreamQueryOperation#getFieldIndices()} describes the 
mapping between fields of
- * the {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
+ * {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping 
between fields of the
+ * {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
  */
 @Internal
-public class ScalaDataStreamQueryOperation<E> implements QueryOperation {
+@Deprecated
+public class DataStreamQueryOperation<E> implements QueryOperation {
 
     /**
      * The table identifier registered under the environment. The identifier 
might be null when the
-     * it is from {@code StreamTableEnvironment#fromDataStream(DataStream)}. 
But the identifier
-     * should be not null if is from {@code 
StreamTableEnvironment#createTemporaryView(String,
-     * DataStream)} with a registered name.
+     * it is from {@code StreamTableEnvironment#fromDataStream(DataStream, 
Expression...)}. But the
+     * identifier should be not null if is from {@code
+     * StreamTableEnvironment#createTemporaryView(String, DataStream)} with a 
registered name.
      */
     @Nullable private final ObjectIdentifier identifier;
 
@@ -55,12 +56,12 @@ public class ScalaDataStreamQueryOperation<E> implements 
QueryOperation {
     private final int[] fieldIndices;
     private final ResolvedSchema resolvedSchema;
 
-    public ScalaDataStreamQueryOperation(
+    public DataStreamQueryOperation(
             DataStream<E> dataStream, int[] fieldIndices, ResolvedSchema 
resolvedSchema) {
         this(null, dataStream, fieldIndices, resolvedSchema);
     }
 
-    public ScalaDataStreamQueryOperation(
+    public DataStreamQueryOperation(
             ObjectIdentifier identifier,
             DataStream<E> dataStream,
             int[] fieldIndices,
@@ -75,6 +76,10 @@ public class ScalaDataStreamQueryOperation<E> implements 
QueryOperation {
         return dataStream;
     }
 
+    public Optional<ObjectIdentifier> getIdentifier() {
+        return Optional.ofNullable(identifier);
+    }
+
     public int[] getFieldIndices() {
         return fieldIndices;
     }
@@ -84,10 +89,6 @@ public class ScalaDataStreamQueryOperation<E> implements 
QueryOperation {
         return resolvedSchema;
     }
 
-    public Optional<ObjectIdentifier> getIdentifier() {
-        return Optional.ofNullable(identifier);
-    }
-
     @Override
     public String asSummaryString() {
         Map<String, Object> args = new LinkedHashMap<>();
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
similarity index 96%
rename from 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java
rename to 
flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
index cd8b3db..d7e96d1 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaExternalQueryOperation.java
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/operations/ExternalQueryOperation.java
@@ -41,7 +41,7 @@ import java.util.Map;
  * @param <E> External type of data stream
  */
 @Internal
-public final class JavaExternalQueryOperation<E> implements QueryOperation {
+public final class ExternalQueryOperation<E> implements QueryOperation {
 
     private final ObjectIdentifier identifier;
 
@@ -55,7 +55,7 @@ public final class JavaExternalQueryOperation<E> implements 
QueryOperation {
 
     private final ResolvedSchema resolvedSchema;
 
-    public JavaExternalQueryOperation(
+    public ExternalQueryOperation(
             ObjectIdentifier identifier,
             DataStream<E> dataStream,
             DataType physicalDataType,
diff --git a/flink-table/flink-table-api-java-bridge/pom.xml 
b/flink-table/flink-table-api-java-bridge/pom.xml
index 1d4d64e..c2246fe 100644
--- a/flink-table/flink-table-api-java-bridge/pom.xml
+++ b/flink-table/flink-table-api-java-bridge/pom.xml
@@ -44,6 +44,12 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-bridge-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index b0957f6..c6c7d43 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -20,11 +20,7 @@ package org.apache.flink.table.api.bridge.java.internal;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
@@ -32,58 +28,36 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.ValidationException;
+import 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.java.StreamStatementSet;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.SchemaTranslator;
-import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.ExpressionParser;
 import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.expressions.ApiExpressionUtils;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.operations.ExternalModifyOperation;
-import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
-import org.apache.flink.table.operations.JavaExternalQueryOperation;
-import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.table.typeutils.FieldInfoUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
-import java.lang.reflect.Method;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 /**
  * The implementation for a Java {@link StreamTableEnvironment}. This enables 
conversions from/to
@@ -92,11 +66,9 @@ import java.util.stream.Collectors;
  * <p>It binds to a given {@link StreamExecutionEnvironment}.
  */
 @Internal
-public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl
+public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnvironmentImpl
         implements StreamTableEnvironment {
 
-    private final StreamExecutionEnvironment executionEnvironment;
-
     public StreamTableEnvironmentImpl(
             CatalogManager catalogManager,
             ModuleManager moduleManager,
@@ -115,8 +87,8 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
                 functionCatalog,
                 planner,
                 isStreamingMode,
-                userClassLoader);
-        this.executionEnvironment = executionEnvironment;
+                userClassLoader,
+                executionEnvironment);
     }
 
     public static StreamTableEnvironment create(
@@ -168,27 +140,6 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
                 classLoader);
     }
 
-    private static Executor lookupExecutor(
-            ClassLoader classLoader,
-            String executorIdentifier,
-            StreamExecutionEnvironment executionEnvironment) {
-        try {
-            final ExecutorFactory executorFactory =
-                    FactoryUtil.discoverFactory(
-                            classLoader, ExecutorFactory.class, 
executorIdentifier);
-            final Method createMethod =
-                    executorFactory
-                            .getClass()
-                            .getMethod("create", 
StreamExecutionEnvironment.class);
-
-            return (Executor) createMethod.invoke(executorFactory, 
executionEnvironment);
-        } catch (Exception e) {
-            throw new TableException(
-                    "Could not instantiate the executor. Make sure a planner 
module is on the classpath",
-                    e);
-        }
-    }
-
     @Override
     public <T> void registerFunction(String name, TableFunction<T> 
tableFunction) {
         TypeInformation<T> typeInfo =
@@ -263,65 +214,6 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
                 path, fromStreamInternal(dataStream, schema, path, 
ChangelogMode.insertOnly()));
     }
 
-    private <T> Table fromStreamInternal(
-            DataStream<T> dataStream,
-            @Nullable Schema schema,
-            @Nullable String viewPath,
-            ChangelogMode changelogMode) {
-        Preconditions.checkNotNull(dataStream, "Data stream must not be 
null.");
-        Preconditions.checkNotNull(changelogMode, "Changelog mode must not be 
null.");
-
-        if (dataStream.getExecutionEnvironment() != executionEnvironment) {
-            throw new ValidationException(
-                    "The DataStream's StreamExecutionEnvironment must be 
identical to the one that "
-                            + "has been passed to the StreamTableEnvironment 
during instantiation.");
-        }
-
-        final CatalogManager catalogManager = getCatalogManager();
-        final SchemaResolver schemaResolver = 
catalogManager.getSchemaResolver();
-        final OperationTreeBuilder operationTreeBuilder = 
getOperationTreeBuilder();
-
-        final UnresolvedIdentifier unresolvedIdentifier;
-        if (viewPath != null) {
-            unresolvedIdentifier = getParser().parseIdentifier(viewPath);
-        } else {
-            unresolvedIdentifier =
-                    UnresolvedIdentifier.of("Unregistered_DataStream_Source_" 
+ dataStream.getId());
-        }
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-        final SchemaTranslator.ConsumingResult schemaTranslationResult =
-                SchemaTranslator.createConsumingResult(
-                        catalogManager.getDataTypeFactory(), 
dataStream.getType(), schema);
-
-        final ResolvedSchema resolvedSchema =
-                schemaTranslationResult.getSchema().resolve(schemaResolver);
-
-        final QueryOperation scanOperation =
-                new JavaExternalQueryOperation<>(
-                        objectIdentifier,
-                        dataStream,
-                        schemaTranslationResult.getPhysicalDataType(),
-                        schemaTranslationResult.isTopLevelRecord(),
-                        changelogMode,
-                        resolvedSchema);
-
-        final List<String> projections = 
schemaTranslationResult.getProjections();
-        if (projections == null) {
-            return createTable(scanOperation);
-        }
-
-        final QueryOperation projectOperation =
-                operationTreeBuilder.project(
-                        projections.stream()
-                                .map(ApiExpressionUtils::unresolvedRef)
-                                .collect(Collectors.toList()),
-                        scanOperation);
-
-        return createTable(projectOperation);
-    }
-
     @Override
     public DataStream<Row> toDataStream(Table table) {
         Preconditions.checkNotNull(table, "Table must not be null.");
@@ -391,71 +283,11 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
         return toStreamInternal(table, schemaTranslationResult, changelogMode);
     }
 
-    private <T> DataStream<T> toStreamInternal(
-            Table table,
-            SchemaTranslator.ProducingResult schemaTranslationResult,
-            @Nullable ChangelogMode changelogMode) {
-        final CatalogManager catalogManager = getCatalogManager();
-        final SchemaResolver schemaResolver = 
catalogManager.getSchemaResolver();
-        final OperationTreeBuilder operationTreeBuilder = 
getOperationTreeBuilder();
-
-        final QueryOperation projectOperation =
-                schemaTranslationResult
-                        .getProjections()
-                        .map(
-                                projections ->
-                                        operationTreeBuilder.project(
-                                                projections.stream()
-                                                        
.map(ApiExpressionUtils::unresolvedRef)
-                                                        
.collect(Collectors.toList()),
-                                                table.getQueryOperation()))
-                        .orElseGet(table::getQueryOperation);
-
-        final ResolvedSchema resolvedSchema =
-                schemaResolver.resolve(schemaTranslationResult.getSchema());
-
-        final UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(
-                        "Unregistered_DataStream_Sink_" + 
ExternalModifyOperation.getUniqueId());
-        final ObjectIdentifier objectIdentifier =
-                catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-        final ExternalModifyOperation modifyOperation =
-                new ExternalModifyOperation(
-                        objectIdentifier,
-                        projectOperation,
-                        resolvedSchema,
-                        changelogMode,
-                        schemaTranslationResult
-                                .getPhysicalDataType()
-                                
.orElseGet(resolvedSchema::toPhysicalRowDataType));
-
-        return toStreamInternal(table, modifyOperation);
-    }
-
-    private <T> DataStream<T> toStreamInternal(Table table, ModifyOperation 
modifyOperation) {
-        final List<Transformation<?>> transformations =
-                planner.translate(Collections.singletonList(modifyOperation));
-
-        final Transformation<T> transformation = getTransformation(table, 
transformations);
-        executionEnvironment.addOperator(transformation);
-
-        // reconfigure whenever planner transformations are added
-        executionEnvironment.configure(tableConfig.getConfiguration());
-
-        return new DataStream<>(executionEnvironment, transformation);
-    }
-
     @Override
     public StreamStatementSet createStatementSet() {
         return new StreamStatementSetImpl(this);
     }
 
-    void attachAsDataStream(List<ModifyOperation> modifyOperations) {
-        final List<Transformation<?>> transformations = 
translate(modifyOperations);
-        transformations.forEach(executionEnvironment::addOperator);
-    }
-
     @Override
     public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
         List<Expression> expressions = 
ExpressionParser.INSTANCE.parseExpressionList(fields);
@@ -464,10 +296,7 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
 
     @Override
     public <T> Table fromDataStream(DataStream<T> dataStream, Expression... 
fields) {
-        JavaDataStreamQueryOperation<T> queryOperation =
-                asQueryOperation(dataStream, 
Optional.of(Arrays.asList(fields)));
-
-        return createTable(queryOperation);
+        return createTable(asQueryOperation(dataStream, 
Optional.of(Arrays.asList(fields))));
     }
 
     @Override
@@ -492,22 +321,6 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
     }
 
     @Override
-    protected QueryOperation qualifyQueryOperation(
-            ObjectIdentifier identifier, QueryOperation queryOperation) {
-        if (queryOperation instanceof JavaDataStreamQueryOperation) {
-            JavaDataStreamQueryOperation<?> operation =
-                    (JavaDataStreamQueryOperation) queryOperation;
-            return new JavaDataStreamQueryOperation<>(
-                    identifier,
-                    operation.getDataStream(),
-                    operation.getFieldIndices(),
-                    operation.getResolvedSchema());
-        } else {
-            return queryOperation;
-        }
-    }
-
-    @Override
     public <T> DataStream<T> toAppendStream(Table table, Class<T> clazz) {
         TypeInformation<T> typeInfo = extractTypeInformation(table, clazz);
         return toAppendStream(table, typeInfo);
@@ -540,82 +353,9 @@ public final class StreamTableEnvironmentImpl extends 
TableEnvironmentImpl
         return toStreamInternal(table, modifyOperation);
     }
 
-    /**
-     * This is a temporary workaround for Python API. Python API should not use
-     * StreamExecutionEnvironment at all.
-     */
-    @Internal
-    public StreamExecutionEnvironment execEnv() {
-        return executionEnvironment;
-    }
-
     @Override
     protected void validateTableSource(TableSource<?> tableSource) {
         super.validateTableSource(tableSource);
         
validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
     }
-
-    private <T> TypeInformation<T> extractTypeInformation(Table table, 
Class<T> clazz) {
-        try {
-            return TypeExtractor.createTypeInfo(clazz);
-        } catch (Exception ex) {
-            throw new ValidationException(
-                    String.format(
-                            "Could not convert query: %s to a DataStream of 
class %s",
-                            table.getQueryOperation().asSummaryString(), 
clazz.getSimpleName()),
-                    ex);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T> Transformation<T> getTransformation(
-            Table table, List<Transformation<?>> transformations) {
-        if (transformations.size() != 1) {
-            throw new TableException(
-                    String.format(
-                            "Expected a single transformation for query: %s\n 
Got: %s",
-                            table.getQueryOperation().asSummaryString(), 
transformations));
-        }
-
-        return (Transformation<T>) transformations.get(0);
-    }
-
-    private <T> DataType wrapWithChangeFlag(TypeInformation<T> outputType) {
-        TupleTypeInfo tupleTypeInfo =
-                new TupleTypeInfo<Tuple2<Boolean, T>>(Types.BOOLEAN(), 
outputType);
-        return TypeConversions.fromLegacyInfoToDataType(tupleTypeInfo);
-    }
-
-    private <T> JavaDataStreamQueryOperation<T> asQueryOperation(
-            DataStream<T> dataStream, Optional<List<Expression>> fields) {
-        TypeInformation<T> streamType = dataStream.getType();
-
-        // get field names and types for all non-replaced fields
-        FieldInfoUtils.TypeInfoSchema typeInfoSchema =
-                fields.map(
-                                f -> {
-                                    FieldInfoUtils.TypeInfoSchema fieldsInfo =
-                                            FieldInfoUtils.getFieldsInfo(
-                                                    streamType, f.toArray(new 
Expression[0]));
-
-                                    // check if event-time is enabled
-                                    
validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
-                                    return fieldsInfo;
-                                })
-                        .orElseGet(() -> 
FieldInfoUtils.getFieldsInfo(streamType));
-
-        return new JavaDataStreamQueryOperation<>(
-                dataStream, typeInfoSchema.getIndices(), 
typeInfoSchema.toResolvedSchema());
-    }
-
-    private void validateTimeCharacteristic(boolean isRowtimeDefined) {
-        if (isRowtimeDefined
-                && executionEnvironment.getStreamTimeCharacteristic()
-                        != TimeCharacteristic.EventTime) {
-            throw new ValidationException(
-                    String.format(
-                            "A rowtime attribute requires an EventTime time 
characteristic in stream environment. But is: %s",
-                            
executionEnvironment.getStreamTimeCharacteristic()));
-        }
-    }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java
deleted file mode 100644
index a2fe8dd..0000000
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/JavaDataStreamQueryOperation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.expressions.Expression;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/**
- * Describes a relational operation that reads from a {@link DataStream}.
- *
- * <p>This operation may expose only part, or change the order of the fields 
available in a {@link
- * org.apache.flink.api.common.typeutils.CompositeType} of the underlying 
{@link DataStream}. The
- * {@link JavaDataStreamQueryOperation#getFieldIndices()} describes the 
mapping between fields of
- * the {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
- */
-@Internal
-public class JavaDataStreamQueryOperation<E> implements QueryOperation {
-
-    /**
-     * The table identifier registered under the environment. The identifier 
might be null when the
-     * it is from {@link StreamTableEnvironment#fromDataStream(DataStream, 
Expression...)}. But the
-     * identifier should be not null if is from {@link
-     * StreamTableEnvironment#createTemporaryView(String, DataStream)} with a 
registered name.
-     */
-    @Nullable private final ObjectIdentifier identifier;
-
-    private final DataStream<E> dataStream;
-    private final int[] fieldIndices;
-    private final ResolvedSchema resolvedSchema;
-
-    public JavaDataStreamQueryOperation(
-            DataStream<E> dataStream, int[] fieldIndices, ResolvedSchema 
resolvedSchema) {
-        this(null, dataStream, fieldIndices, resolvedSchema);
-    }
-
-    public JavaDataStreamQueryOperation(
-            ObjectIdentifier identifier,
-            DataStream<E> dataStream,
-            int[] fieldIndices,
-            ResolvedSchema resolvedSchema) {
-        this.identifier = identifier;
-        this.dataStream = dataStream;
-        this.resolvedSchema = resolvedSchema;
-        this.fieldIndices = fieldIndices;
-    }
-
-    public DataStream<E> getDataStream() {
-        return dataStream;
-    }
-
-    public Optional<ObjectIdentifier> getIdentifier() {
-        return Optional.ofNullable(identifier);
-    }
-
-    public int[] getFieldIndices() {
-        return fieldIndices;
-    }
-
-    @Override
-    public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
-    }
-
-    @Override
-    public String asSummaryString() {
-        Map<String, Object> args = new LinkedHashMap<>();
-        if (identifier != null) {
-            args.put("id", identifier.asSummaryString());
-        } else {
-            args.put("id", dataStream.getId());
-        }
-        args.put("fields", resolvedSchema.getColumnNames());
-
-        return OperationUtils.formatWithChildren(
-                "DataStream", args, getChildren(), Operation::asSummaryString);
-    }
-
-    @Override
-    public List<QueryOperation> getChildren() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public <T> T accept(QueryOperationVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 4c5a8a8..e163cd7 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -1844,7 +1844,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         }
     }
 
-    protected TableImpl createTable(QueryOperation tableOperation) {
+    @VisibleForTesting
+    public TableImpl createTable(QueryOperation tableOperation) {
         return TableImpl.createTable(
                 this,
                 tableOperation,
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
index 585b930..f1ce1963 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.factories.Factory;
 
 /**
@@ -32,14 +31,8 @@ import org.apache.flink.table.factories.Factory;
  * <p>Usually, there should only be one executor factory in the class path. 
However, advanced users
  * can implement a custom one for hooking into the submission process.
  *
- * <p><b>Important:</b> Implementations of this interface should also 
implement the method
- *
- * <pre>
- *   public Executor create(Configuration, StreamExecutionEnvironment);
- * </pre>
- *
- * <p>This method will be used when instantiating a {@link TableEnvironment} 
from one of the
- * bridging modules which enables conversion from/to {@code DataStream} API.
+ * <p><b>Important:</b> In order to support DataStream APIs, implementations 
of this interface must
+ * also implement {@code StreamExecutorFactory} from the {@code 
flink-table-api-bridge-base} module.
  */
 @Internal
 public interface ExecutorFactory extends Factory {
diff --git a/flink-table/flink-table-api-scala-bridge/pom.xml 
b/flink-table/flink-table-api-scala-bridge/pom.xml
index 72c9900..9dd3d22 100644
--- a/flink-table/flink-table-api-scala-bridge/pom.xml
+++ b/flink-table/flink-table-api-scala-bridge/pom.xml
@@ -44,6 +44,12 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-bridge-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-scala_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                </dependency>
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java
 
b/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java
deleted file mode 100644
index 15628e4..0000000
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/java/org/apache/flink/table/operations/ScalaExternalQueryOperation.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.types.DataType;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Describes a relational operation that reads from a {@link DataStream}.
- *
- * <p>It contains all information necessary to perform a stream-to-table 
conversion.
- *
- * <p>This class needs to be kept in sync with {@code 
JavaExternalQueryOperation} in the Java
- * bridging module.
- *
- * @param <E> External type of data stream
- */
-@Internal
-public final class ScalaExternalQueryOperation<E> implements QueryOperation {
-
-    private final ObjectIdentifier identifier;
-
-    private final DataStream<E> dataStream;
-
-    private final DataType physicalDataType;
-
-    private final boolean isTopLevelRecord;
-
-    private final ChangelogMode changelogMode;
-
-    private final ResolvedSchema resolvedSchema;
-
-    public ScalaExternalQueryOperation(
-            ObjectIdentifier identifier,
-            DataStream<E> dataStream,
-            DataType physicalDataType,
-            boolean isTopLevelRecord,
-            ChangelogMode changelogMode,
-            ResolvedSchema resolvedSchema) {
-        this.identifier = identifier;
-        this.dataStream = dataStream;
-        this.physicalDataType = physicalDataType;
-        this.isTopLevelRecord = isTopLevelRecord;
-        this.changelogMode = changelogMode;
-        this.resolvedSchema = resolvedSchema;
-    }
-
-    public ObjectIdentifier getIdentifier() {
-        return identifier;
-    }
-
-    public DataStream<E> getDataStream() {
-        return dataStream;
-    }
-
-    public DataType getPhysicalDataType() {
-        return physicalDataType;
-    }
-
-    public boolean isTopLevelRecord() {
-        return isTopLevelRecord;
-    }
-
-    public ChangelogMode getChangelogMode() {
-        return changelogMode;
-    }
-
-    @Override
-    public String asSummaryString() {
-        Map<String, Object> args = new LinkedHashMap<>();
-        args.put("identifier", identifier);
-        args.put("stream", dataStream.getId());
-        args.put("type", physicalDataType);
-        args.put("isTopLevelRecord", isTopLevelRecord);
-        args.put("changelogMode", changelogMode);
-        args.put("fields", resolvedSchema.getColumnNames());
-
-        return OperationUtils.formatWithChildren(
-                "DataStreamInput", args, getChildren(), 
Operation::asSummaryString);
-    }
-
-    @Override
-    public List<QueryOperation> getChildren() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
-    }
-
-    @Override
-    public <T> T accept(QueryOperationVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-}
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 8fd5e28..8bbc63c 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,36 +17,29 @@
  */
 package org.apache.flink.table.api.bridge.scala.internal
 
-import java.util
-import java.util.{Collections, List => JList}
-import javax.annotation.Nullable
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala.StreamStatementSet
-import org.apache.flink.table.api.internal.TableEnvironmentImpl
-import org.apache.flink.table.catalog.SchemaTranslator.ProducingResult
+import 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl
+import org.apache.flink.table.api.bridge.scala.{StreamStatementSet, 
StreamTableEnvironment}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.connector.ChangelogMode
-import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner}
-import org.apache.flink.table.expressions.{ApiExpressionUtils, Expression}
-import org.apache.flink.table.factories.{FactoryUtil, PlannerFactoryUtil}
+import org.apache.flink.table.delegation.{Executor, Planner}
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.factories.PlannerFactoryUtil
 import org.apache.flink.table.functions.{AggregateFunction, 
TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations._
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.utils.TypeConversions
-import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions
 
+import java.util.Optional
 import scala.collection.JavaConverters._
 
 /**
@@ -64,7 +57,7 @@ class StreamTableEnvironmentImpl (
     executor: Executor,
     isStreaming: Boolean,
     userClassLoader: ClassLoader)
-  extends TableEnvironmentImpl(
+  extends AbstractStreamTableEnvironmentImpl(
     catalogManager,
     moduleManager,
     config,
@@ -72,8 +65,9 @@ class StreamTableEnvironmentImpl (
     functionCatalog,
     planner,
     isStreaming,
-    userClassLoader)
-  with org.apache.flink.table.api.bridge.scala.StreamTableEnvironment {
+    userClassLoader,
+    scalaExecutionEnvironment.getWrappedStreamExecutionEnvironment)
+  with StreamTableEnvironment {
 
   override def fromDataStream[T](dataStream: DataStream[T]): Table = {
     Preconditions.checkNotNull(dataStream, "Data stream must not be null.")
@@ -127,62 +121,6 @@ class StreamTableEnvironmentImpl (
       fromStreamInternal(dataStream.javaStream, schema, path, 
ChangelogMode.insertOnly()))
   }
 
-  private def fromStreamInternal[T](
-      dataStream: JDataStream[T],
-      @Nullable schema: Schema,
-      @Nullable viewPath: String,
-      changelogMode: ChangelogMode): Table = {
-    Preconditions.checkNotNull(changelogMode, "Changelog mode must not be 
null.")
-
-    if (dataStream.getExecutionEnvironment ne 
scalaExecutionEnvironment.getJavaEnv) {
-      throw new ValidationException(
-        "The DataStream's StreamExecutionEnvironment must be identical to the 
one that " +
-          "has been passed to the StreamTableEnvironment during 
instantiation.")
-    }
-
-    val catalogManager = getCatalogManager
-    val schemaResolver = catalogManager.getSchemaResolver
-    val operationTreeBuilder = getOperationTreeBuilder
-
-    val unresolvedIdentifier = if (viewPath != null) {
-      getParser.parseIdentifier(viewPath)
-    } else {
-      UnresolvedIdentifier.of("Unregistered_DataStream_Source_" + 
dataStream.getId)
-    }
-    val objectIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier)
-
-    val schemaTranslationResult =
-      SchemaTranslator.createConsumingResult(
-        catalogManager.getDataTypeFactory, dataStream.getType, schema)
-
-    val resolvedSchema = 
schemaTranslationResult.getSchema.resolve(schemaResolver)
-
-    val scanOperation =
-      new ScalaExternalQueryOperation(
-        objectIdentifier,
-        dataStream,
-        schemaTranslationResult.getPhysicalDataType,
-        schemaTranslationResult.isTopLevelRecord,
-        changelogMode,
-        resolvedSchema)
-
-    val projections = schemaTranslationResult.getProjections
-    if (projections == null) {
-      return createTable(scanOperation)
-    }
-
-    val projectOperation =
-      operationTreeBuilder.project(
-        util.Arrays.asList(
-          projections
-            .asScala
-            .map(ApiExpressionUtils.unresolvedRef)
-            .toArray),
-        scanOperation)
-
-    createTable(projectOperation)
-  }
-
   override def toDataStream(table: Table): DataStream[Row] = {
     Preconditions.checkNotNull(table, "Table must not be null.")
     // include all columns of the query (incl. metadata and computed columns)
@@ -210,7 +148,7 @@ class StreamTableEnvironmentImpl (
       table.getResolvedSchema,
       targetDataType)
 
-    toStreamInternal(table, schemaTranslationResult, 
ChangelogMode.insertOnly())
+    new DataStream[T](toStreamInternal(table, schemaTranslationResult, 
ChangelogMode.insertOnly()))
   }
 
   override def toChangelogStream(table: Table): DataStream[Row] = {
@@ -220,7 +158,7 @@ class StreamTableEnvironmentImpl (
       table.getResolvedSchema,
       null)
 
-    toStreamInternal(table, schemaTranslationResult, null)
+    new DataStream[Row](toStreamInternal(table, schemaTranslationResult, null))
   }
 
   override def toChangelogStream(table: Table, targetSchema: Schema): 
DataStream[Row] = {
@@ -231,7 +169,7 @@ class StreamTableEnvironmentImpl (
       table.getResolvedSchema,
       targetSchema)
 
-    toStreamInternal(table, schemaTranslationResult, null)
+    new DataStream[Row](toStreamInternal(table, schemaTranslationResult, null))
   }
 
   override def toChangelogStream(
@@ -247,75 +185,15 @@ class StreamTableEnvironmentImpl (
       table.getResolvedSchema,
       targetSchema)
 
-    toStreamInternal(table, schemaTranslationResult, changelogMode)
-  }
-
-  private def toStreamInternal[T](
-      table: Table,
-      schemaTranslationResult: ProducingResult,
-      @Nullable changelogMode: ChangelogMode): DataStream[T] = {
-    val catalogManager = getCatalogManager
-    val schemaResolver = catalogManager.getSchemaResolver
-    val operationTreeBuilder = getOperationTreeBuilder
-
-    val optionalProjections = schemaTranslationResult.getProjections
-    val projectOperation = if (optionalProjections.isPresent) {
-      val projections = optionalProjections.get
-      operationTreeBuilder.project(
-        projections.asScala
-          .map(ApiExpressionUtils.unresolvedRef)
-          .map(_.asInstanceOf[Expression])
-          .asJava,
-        table.getQueryOperation)
-    } else {
-      table.getQueryOperation
-    }
-
-    val resolvedSchema = 
schemaResolver.resolve(schemaTranslationResult.getSchema)
-
-    val unresolvedIdentifier =
-      UnresolvedIdentifier.of("Unregistered_DataStream_Sink_" + 
ExternalModifyOperation.getUniqueId)
-    val objectIdentifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier)
-
-    val modifyOperation = new ExternalModifyOperation(
-      objectIdentifier,
-      projectOperation,
-      resolvedSchema,
-      changelogMode,
-      schemaTranslationResult.getPhysicalDataType
-        .orElse(resolvedSchema.toPhysicalRowDataType))
-
-    toStreamInternal(table, modifyOperation)
-  }
-
-  private def toStreamInternal[T](
-      table: Table,
-      modifyOperation: ModifyOperation)
-    : DataStream[T] = {
-    val javaExecutionEnvironment = scalaExecutionEnvironment.getJavaEnv
-
-    val transformations = 
planner.translate(Collections.singletonList(modifyOperation))
-    val streamTransformation: Transformation[T] = getTransformation(table, 
transformations)
-    javaExecutionEnvironment.addOperator(streamTransformation)
-
-    // reconfigure whenever planner transformations are added
-    javaExecutionEnvironment.configure(tableConfig.getConfiguration)
-
-    new DataStream[T](new JDataStream[T](javaExecutionEnvironment, 
streamTransformation))
+    new DataStream[Row](toStreamInternal(table, schemaTranslationResult, 
changelogMode))
   }
 
   override def createStatementSet(): StreamStatementSet = {
     new StreamStatementSetImpl(this)
   }
 
-  private[internal] def attachAsDataStream(modifyOperations: 
JList[ModifyOperation]) {
-    val javaEnv = scalaExecutionEnvironment.getJavaEnv
-    val transformations = translate(modifyOperations).asScala
-    transformations.foreach(javaEnv.addOperator)
-  }
-
   override def fromDataStream[T](dataStream: DataStream[T], fields: 
Expression*): Table = {
-    val queryOperation = asQueryOperation(dataStream, 
Some(fields.toList.asJava))
+    val queryOperation = asQueryOperation(dataStream.javaStream, 
Optional.of(fields.toList.asJava))
     createTable(queryOperation)
   }
 
@@ -338,7 +216,7 @@ class StreamTableEnvironmentImpl (
       table.getQueryOperation,
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.APPEND)
-    toStreamInternal[T](table, modifyOperation)
+    new DataStream[T](toStreamInternal[T](table, modifyOperation))
   }
 
   override def toRetractStream[T: TypeInformation](table: Table): 
DataStream[(Boolean, T)] = {
@@ -348,7 +226,7 @@ class StreamTableEnvironmentImpl (
       table.getQueryOperation,
       TypeConversions.fromLegacyInfoToDataType(returnType),
       OutputConversionModifyOperation.UpdateMode.RETRACT)
-    toStreamInternal(table, modifyOperation)
+    new DataStream[(Boolean, T)](toStreamInternal(table, modifyOperation))
   }
 
   override def registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T]): Unit = {
@@ -404,57 +282,6 @@ class StreamTableEnvironmentImpl (
     }
   }
 
-  private def getTransformation[T](
-      table: Table,
-      transformations: util.List[Transformation[_]])
-    : Transformation[T] = {
-    if (transformations.size != 1) {
-      throw new TableException(String
-        .format(
-          "Expected a single transformation for query: %s\n Got: %s",
-          table.getQueryOperation.asSummaryString,
-          transformations))
-    }
-    transformations.get(0).asInstanceOf[Transformation[T]]
-  }
-
-  private def asQueryOperation[T](
-      dataStream: DataStream[T],
-      fields: Option[util.List[Expression]]) = {
-    val streamType = dataStream.javaStream.getType
-    // get field names and types for all non-replaced fields
-    val typeInfoSchema = fields.map((f: JList[Expression]) => {
-      val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f.toArray(new 
Array[Expression](0)))
-      // check if event-time is enabled
-      if (fieldsInfo.isRowtimeDefined &&
-        (scalaExecutionEnvironment.getStreamTimeCharacteristic ne 
TimeCharacteristic.EventTime)) {
-        throw new ValidationException(String.format(
-          "A rowtime attribute requires an EventTime time characteristic in 
stream " +
-            "environment. But is: %s",
-          scalaExecutionEnvironment.getStreamTimeCharacteristic))
-      }
-      fieldsInfo
-    }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType))
-    new ScalaDataStreamQueryOperation[T](
-      dataStream.javaStream,
-      typeInfoSchema.getIndices,
-      typeInfoSchema.toResolvedSchema)
-  }
-
-  override protected def qualifyQueryOperation(
-    identifier: ObjectIdentifier,
-    queryOperation: QueryOperation): QueryOperation = queryOperation match {
-    case qo: ScalaDataStreamQueryOperation[Any] =>
-      new ScalaDataStreamQueryOperation[Any](
-        identifier,
-        qo.getDataStream,
-        qo.getFieldIndices,
-        qo.getResolvedSchema
-      )
-    case _ =>
-      queryOperation
-  }
-
   override def createTemporaryView[T](
       path: String,
       dataStream: DataStream[T],
@@ -489,7 +316,8 @@ object StreamTableEnvironmentImpl {
 
     val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, 
moduleManager)
 
-    val executor = lookupExecutor(classLoader, settings.getExecutor, 
executionEnvironment)
+    val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
+      classLoader, settings.getExecutor, 
executionEnvironment.getWrappedStreamExecutionEnvironment)
 
     val planner = PlannerFactoryUtil.createPlanner(settings.getPlanner, 
executor, tableConfig,
       moduleManager, catalogManager, functionCatalog)
@@ -506,27 +334,4 @@ object StreamTableEnvironmentImpl {
       classLoader
     )
   }
-
-  private def lookupExecutor(
-      classLoader: ClassLoader,
-      executorIdentifier: String,
-      executionEnvironment: StreamExecutionEnvironment)
-    :Executor =
-    try {
-      val executorFactory =
-        FactoryUtil.discoverFactory(classLoader, classOf[ExecutorFactory], 
executorIdentifier)
-      val createMethod = executorFactory
-        .getClass
-        .getMethod("create", classOf[JStreamExecutionEnvironment])
-
-      createMethod
-        .invoke(
-          executorFactory,
-          executionEnvironment.getWrappedStreamExecutionEnvironment)
-        .asInstanceOf[Executor]
-    } catch {
-      case e: Exception =>
-        throw new TableException("Could not instantiate the executor. Make 
sure a planner module " +
-          "is on the classpath", e)
-    }
 }
diff --git a/flink-table/flink-table-planner/pom.xml 
b/flink-table/flink-table-planner/pom.xml
index c971579..8dd4dc9 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -71,7 +71,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+                       <artifactId>flink-table-api-bridge-base</artifactId>
                        <version>${project.version}</version>
                </dependency>
 
@@ -83,12 +83,6 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-sql-parser</artifactId>
                        <version>${project.version}</version>
                        <exclusions>
@@ -121,14 +115,12 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-scala_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>provided</scope>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>provided</scope>
                </dependency>
 
                <dependency>
@@ -242,6 +234,20 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- SuccessException used in TestValuesTableFactory -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java
index 0ea9938..8a51dc7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultExecutorFactory.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.StreamExecutorFactory;
 
 import java.util.Collections;
 import java.util.Set;
@@ -40,13 +41,14 @@ import java.util.Set;
  * #create(StreamExecutionEnvironment)}.
  */
 @Internal
-public final class DefaultExecutorFactory implements ExecutorFactory {
+public final class DefaultExecutorFactory implements StreamExecutorFactory {
 
     @Override
     public Executor create(Configuration configuration) {
         return 
create(StreamExecutionEnvironment.getExecutionEnvironment(configuration));
     }
 
+    @Override
     public Executor create(StreamExecutionEnvironment executionEnvironment) {
         return new DefaultExecutor(executionEnvironment);
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java
similarity index 92%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java
index f35024a..1f58c51 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DataStreamQueryOperation.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/InternalDataStreamQueryOperation.java
@@ -41,11 +41,11 @@ import java.util.Map;
  *
  * <p>This operation may expose only part, or change the order of the fields 
available in a {@link
  * org.apache.flink.api.common.typeutils.CompositeType} of the underlying 
{@link DataStream}. The
- * {@link DataStreamQueryOperation#getFieldIndices()} describes the mapping 
between fields of the
- * {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
+ * {@link InternalDataStreamQueryOperation#getFieldIndices()} describes the 
mapping between fields
+ * of the {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
  */
 @Internal
-public class DataStreamQueryOperation<E> implements QueryOperation {
+public class InternalDataStreamQueryOperation<E> implements QueryOperation {
 
     private final ObjectIdentifier identifier;
     private final DataStream<E> dataStream;
@@ -55,7 +55,7 @@ public class DataStreamQueryOperation<E> implements 
QueryOperation {
     private final boolean[] fieldNullables;
     private final FlinkStatistic statistic;
 
-    public DataStreamQueryOperation(
+    public InternalDataStreamQueryOperation(
             ObjectIdentifier identifier,
             DataStream<E> dataStream,
             int[] fieldIndices,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index ed9d3ca..d1a28d8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -42,17 +42,15 @@ import 
org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.operations.AggregateQueryOperation;
 import org.apache.flink.table.operations.CalculatedQueryOperation;
 import org.apache.flink.table.operations.CatalogQueryOperation;
+import org.apache.flink.table.operations.DataStreamQueryOperation;
 import org.apache.flink.table.operations.DistinctQueryOperation;
+import org.apache.flink.table.operations.ExternalQueryOperation;
 import org.apache.flink.table.operations.FilterQueryOperation;
-import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
-import org.apache.flink.table.operations.JavaExternalQueryOperation;
 import org.apache.flink.table.operations.JoinQueryOperation;
 import org.apache.flink.table.operations.JoinQueryOperation.JoinType;
 import org.apache.flink.table.operations.ProjectQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.QueryOperationVisitor;
-import org.apache.flink.table.operations.ScalaDataStreamQueryOperation;
-import org.apache.flink.table.operations.ScalaExternalQueryOperation;
 import org.apache.flink.table.operations.SetQueryOperation;
 import org.apache.flink.table.operations.SortQueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
@@ -75,7 +73,7 @@ import 
org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor;
 import 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.utils.TableSqlFunction;
-import org.apache.flink.table.planner.operations.DataStreamQueryOperation;
+import 
org.apache.flink.table.planner.operations.InternalDataStreamQueryOperation;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import org.apache.flink.table.planner.operations.RichTableSourceQueryOperation;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
@@ -431,21 +429,11 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
         public RelNode visit(QueryOperation other) {
             if (other instanceof PlannerQueryOperation) {
                 return ((PlannerQueryOperation) other).getCalciteTree();
-            } else if (other instanceof DataStreamQueryOperation) {
-                return convertToDataStreamScan((DataStreamQueryOperation<?>) 
other);
-            } else if (other instanceof JavaExternalQueryOperation) {
-                final JavaExternalQueryOperation<?> externalQueryOperation =
-                        (JavaExternalQueryOperation<?>) other;
-                return convertToExternalScan(
-                        externalQueryOperation.getIdentifier(),
-                        externalQueryOperation.getDataStream(),
-                        externalQueryOperation.getPhysicalDataType(),
-                        externalQueryOperation.isTopLevelRecord(),
-                        externalQueryOperation.getChangelogMode(),
-                        externalQueryOperation.getResolvedSchema());
-            } else if (other instanceof ScalaExternalQueryOperation) {
-                final ScalaExternalQueryOperation<?> externalQueryOperation =
-                        (ScalaExternalQueryOperation<?>) other;
+            } else if (other instanceof InternalDataStreamQueryOperation) {
+                return 
convertToDataStreamScan((InternalDataStreamQueryOperation<?>) other);
+            } else if (other instanceof ExternalQueryOperation) {
+                final ExternalQueryOperation<?> externalQueryOperation =
+                        (ExternalQueryOperation<?>) other;
                 return convertToExternalScan(
                         externalQueryOperation.getIdentifier(),
                         externalQueryOperation.getDataStream(),
@@ -455,17 +443,9 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                         externalQueryOperation.getResolvedSchema());
             }
             // legacy
-            else if (other instanceof JavaDataStreamQueryOperation) {
-                JavaDataStreamQueryOperation<?> dataStreamQueryOperation =
-                        (JavaDataStreamQueryOperation<?>) other;
-                return convertToDataStreamScan(
-                        dataStreamQueryOperation.getDataStream(),
-                        dataStreamQueryOperation.getFieldIndices(),
-                        dataStreamQueryOperation.getResolvedSchema(),
-                        dataStreamQueryOperation.getIdentifier());
-            } else if (other instanceof ScalaDataStreamQueryOperation) {
-                ScalaDataStreamQueryOperation dataStreamQueryOperation =
-                        (ScalaDataStreamQueryOperation<?>) other;
+            else if (other instanceof DataStreamQueryOperation) {
+                DataStreamQueryOperation<?> dataStreamQueryOperation =
+                        (DataStreamQueryOperation<?>) other;
                 return convertToDataStreamScan(
                         dataStreamQueryOperation.getDataStream(),
                         dataStreamQueryOperation.getFieldIndices(),
@@ -551,7 +531,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                     changelogMode);
         }
 
-        private RelNode convertToDataStreamScan(DataStreamQueryOperation<?> 
operation) {
+        private RelNode 
convertToDataStreamScan(InternalDataStreamQueryOperation<?> operation) {
             List<String> names;
             ObjectIdentifier identifier = operation.getIdentifier();
             if (identifier != null) {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index ef39ea7..8fc6c4d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -17,19 +17,22 @@
  */
 package org.apache.flink.table.planner.utils
 
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier}
 import org.apache.flink.api.common.BatchShuffleMode
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.configuration.ExecutionOptions
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, 
StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => 
ScalaStreamExecEnv}
 import org.apache.flink.streaming.api.{TimeCharacteristic, environment}
 import org.apache.flink.table.api._
-import 
org.apache.flink.table.api.bridge.java.internal.{StreamTableEnvironmentImpl => 
JavaStreamTableEnvImpl}
 import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => 
JavaStreamTableEnv}
-import 
org.apache.flink.table.api.bridge.scala.internal.{StreamTableEnvironmentImpl => 
ScalaStreamTableEnvImpl}
 import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => 
ScalaStreamTableEnv}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.internal.{TableEnvironmentImpl, 
TableEnvironmentInternal, TableImpl}
@@ -47,7 +50,7 @@ import 
org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOper
 import org.apache.flink.table.planner.calcite.CalciteConfig
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
-import org.apache.flink.table.planner.operations.{DataStreamQueryOperation, 
PlannerQueryOperation, RichTableSourceQueryOperation}
+import 
org.apache.flink.table.planner.operations.{InternalDataStreamQueryOperation, 
PlannerQueryOperation, RichTableSourceQueryOperation}
 import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodePlanDumper
@@ -64,13 +67,6 @@ import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, 
ObjectMapper}
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.sql.{SqlExplainLevel, SqlIntervalQualifier}
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.Rule
 import org.junit.rules.{ExpectedException, TemporaryFolder, TestName}
@@ -80,7 +76,6 @@ import _root_.java.util
 import java.io.{File, IOException}
 import java.nio.file.{Files, Paths}
 import java.time.Duration
-
 import _root_.scala.collection.JavaConversions._
 import _root_.scala.io.Source
 
@@ -1624,7 +1619,7 @@ object TableTestUtil {
     }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType))
 
     val fieldCnt = typeInfoSchema.getFieldTypes.length
-    val dataStreamQueryOperation = new DataStreamQueryOperation(
+    val dataStreamQueryOperation = new InternalDataStreamQueryOperation(
       ObjectIdentifier.of(tEnv.getCurrentCatalog, tEnv.getCurrentDatabase, 
name),
       dataStream,
       typeInfoSchema.getIndices,
@@ -1632,22 +1627,10 @@ object TableTestUtil {
       fieldNullables.getOrElse(Array.fill(fieldCnt)(true)),
       statistic.getOrElse(FlinkStatistic.UNKNOWN)
     )
-    val table = createTable(tEnv, dataStreamQueryOperation)
+    val table = 
tEnv.asInstanceOf[TableEnvironmentImpl].createTable(dataStreamQueryOperation)
     tEnv.registerTable(name, table)
   }
 
-  def createTable(tEnv: TableEnvironment, queryOperation: QueryOperation): 
Table = {
-    val createTableMethod = tEnv match {
-      case _: ScalaStreamTableEnvImpl | _: JavaStreamTableEnvImpl =>
-        tEnv.getClass.getSuperclass.getDeclaredMethod("createTable", 
classOf[QueryOperation])
-      case t: TableEnvironmentImpl =>
-        t.getClass.getDeclaredMethod("createTable", classOf[QueryOperation])
-      case _ => throw new TableException(s"Unsupported class: 
${tEnv.getClass.getCanonicalName}")
-    }
-    createTableMethod.setAccessible(true)
-    createTableMethod.invoke(tEnv, queryOperation).asInstanceOf[Table]
-  }
-
   def readFromResource(path: String): String = {
     val basePath = getClass.getResource("/").getFile
     val fullPath = if (path.startsWith("/")) {
diff --git a/flink-table/flink-table-uber/pom.xml 
b/flink-table/flink-table-uber/pom.xml
index c51f8d2..6b80c11 100644
--- a/flink-table/flink-table-uber/pom.xml
+++ b/flink-table/flink-table-uber/pom.xml
@@ -65,6 +65,11 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-bridge-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-api-java-bridge</artifactId>
                        <version>${project.version}</version>
                </dependency>
@@ -110,6 +115,7 @@ under the License.
                                                                        
<include>org.apache.flink:flink-table-common</include>
                                                                        
<include>org.apache.flink:flink-table-api-java</include>
                                                                        
<include>org.apache.flink:flink-table-api-scala_${scala.binary.version}</include>
+                                                                       
<include>org.apache.flink:flink-table-api-bridge-base</include>
                                                                        
<include>org.apache.flink:flink-table-api-java-bridge</include>
                                                                        
<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
                                                                        
<include>org.apache.flink:flink-table-planner_${scala.binary.version}</include>
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index f2f29be..63f6371 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -36,6 +36,7 @@ under the License.
                <module>flink-table-common</module>
                <module>flink-table-api-java</module>
                <module>flink-table-api-scala</module>
+               <module>flink-table-api-bridge-base</module>
                <module>flink-table-api-java-bridge</module>
                <module>flink-table-api-scala-bridge</module>
                <module>flink-table-planner</module>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index e6a6b76..215a822 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -61,6 +61,7 @@ flink-table/flink-sql-parser-hive,\
 flink-table/flink-table-common,\
 flink-table/flink-table-api-java,\
 flink-table/flink-table-api-scala,\
+flink-table/flink-table-api-bridge-base,\
 flink-table/flink-table-api-java-bridge,\
 flink-table/flink-table-api-scala-bridge,\
 flink-table/flink-sql-client,\
@@ -94,6 +95,7 @@ flink-connectors/flink-connector-hbase-2.2,\
 flink-connectors/flink-hcatalog,\
 flink-connectors/flink-hadoop-compatibility,\
 flink-connectors,\
+flink-connectors/flink-connector-files,\
 flink-connectors/flink-connector-jdbc,\
 flink-connectors/flink-connector-cassandra,\
 flink-connectors/flink-connector-elasticsearch6,\

Reply via email to