This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e46999388d844939bbc9e66254f53765722dd358
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Jun 4 09:48:43 2021 +0200

    [FLINK-22877][table] Remove BatchTableEnvironment and related classes
---
 flink-python/pyflink/table/descriptors.py          |  13 -
 .../api/bridge/java/BatchTableEnvironment.java     | 521 ---------------------
 .../table/operations/DataSetQueryOperation.java    |  84 ----
 .../table/descriptors/BatchTableDescriptor.java    |  42 --
 .../api/bridge/scala/BatchTableEnvironment.scala   | 429 -----------------
 .../api/bridge/scala/DataSetConversions.scala      |  65 ---
 .../table/api/bridge/scala/TableConversions.scala  |  24 +-
 .../flink/table/api/bridge/scala/package.scala     |  15 +-
 8 files changed, 2 insertions(+), 1191 deletions(-)

diff --git a/flink-python/pyflink/table/descriptors.py 
b/flink-python/pyflink/table/descriptors.py
index 22c1ccb..dd647af 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -38,7 +38,6 @@ __all__ = [
     'Json',
     'ConnectTableDescriptor',
     'StreamTableDescriptor',
-    'BatchTableDescriptor',
     'CustomConnectorDescriptor',
     'CustomFormatDescriptor'
 ]
@@ -1714,15 +1713,3 @@ class StreamTableDescriptor(ConnectTableDescriptor):
         """
         self._j_stream_table_descriptor = 
self._j_stream_table_descriptor.inUpsertMode()
         return self
-
-
-class BatchTableDescriptor(ConnectTableDescriptor):
-    """
-    Descriptor for specifying a table source and/or sink in a batch 
environment.
-
-    .. seealso:: parent class: :class:`ConnectTableDescriptor`
-    """
-
-    def __init__(self, j_batch_table_descriptor):
-        self._j_batch_table_descriptor = j_batch_table_descriptor
-        super(BatchTableDescriptor, 
self).__init__(self._j_batch_table_descriptor)
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
deleted file mode 100644
index 04dff4a..0000000
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java
+++ /dev/null
@@ -1,521 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.bridge.java;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.descriptors.BatchTableDescriptor;
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.module.ModuleManager;
-
-import java.lang.reflect.Constructor;
-
-import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
-
-/**
- * The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} 
that works with {@link
- * DataSet}s.
- *
- * <p>A TableEnvironment can be used to:
- *
- * <ul>
- *   <li>convert a {@link DataSet} to a {@link Table}
- *   <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog
- *   <li>register a {@link Table} in the {@link TableEnvironment}'s catalog
- *   <li>scan a registered table to obtain a {@link Table}
- *   <li>specify a SQL query on registered tables to obtain a {@link Table}
- *   <li>convert a {@link Table} into a {@link DataSet}
- *   <li>explain the AST and execution plan of a {@link Table}
- * </ul>
- *
- * @deprecated {@link BatchTableEnvironment} will be dropped in Flink 1.14 
because it only supports
- *     the old planner. Use the unified {@link TableEnvironment} instead, 
which supports both batch
- *     and streaming. More advanced operations previously covered by the 
DataSet API can now use the
- *     DataStream API in BATCH execution mode.
- */
-@Deprecated
-@PublicEvolving
-public interface BatchTableEnvironment extends TableEnvironment {
-
-    /**
-     * Registers a {@link TableFunction} under a unique name in the 
TableEnvironment's catalog.
-     * Registered functions can be referenced in Table API and SQL queries.
-     *
-     * @param name The name under which the function is registered.
-     * @param tableFunction The TableFunction to register.
-     * @param <T> The type of the output row.
-     */
-    <T> void registerFunction(String name, TableFunction<T> tableFunction);
-
-    /**
-     * Registers an {@link AggregateFunction} under a unique name in the 
TableEnvironment's catalog.
-     * Registered functions can be referenced in Table API and SQL queries.
-     *
-     * @param name The name under which the function is registered.
-     * @param aggregateFunction The AggregateFunction to register.
-     * @param <T> The type of the output value.
-     * @param <ACC> The type of aggregate accumulator.
-     */
-    <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
aggregateFunction);
-
-    /**
-     * Converts the given {@link DataSet} into a {@link Table}.
-     *
-     * <p>The field names of the {@link Table} are automatically derived from 
the type of the {@link
-     * DataSet}.
-     *
-     * @param dataSet The {@link DataSet} to be converted.
-     * @param <T> The type of the {@link DataSet}.
-     * @return The converted {@link Table}.
-     */
-    <T> Table fromDataSet(DataSet<T> dataSet);
-
-    /**
-     * Converts the given {@link DataSet} into a {@link Table} with specified 
field names.
-     *
-     * <p>There are two modes for mapping original fields to the fields of the 
{@link Table}:
-     *
-     * <p>1. Reference input fields by name: All fields in the schema 
definition are referenced by
-     * name (and possibly renamed using an alias (as). In this mode, fields 
can be reordered and
-     * projected out. This mode can be used for any input type, including 
POJOs.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // use the original 'f0' field and give a better name to the 'f1' field
-     * Table table = tableEnv.fromTable(set, "f0, f1 as name");
-     * }</pre>
-     *
-     * <p>2. Reference input fields by position: In this mode, fields are 
simply renamed. This mode
-     * can only be used if the input type has a defined field order (tuple, 
case class, Row) and
-     * none of the {@code fields} references a field of the input type.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // renames the original fields as 'a' and 'b'
-     * Table table = tableEnv.fromDataSet(set, "a, b");
-     * }</pre>
-     *
-     * @param dataSet The {@link DataSet} to be converted.
-     * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-     *     the {@link Table}.
-     * @param <T> The type of the {@link DataSet}.
-     * @return The converted {@link Table}.
-     * @deprecated use {@link #fromDataSet(DataSet, Expression...)}
-     */
-    @Deprecated
-    <T> Table fromDataSet(DataSet<T> dataSet, String fields);
-
-    /**
-     * Converts the given {@link DataSet} into a {@link Table} with specified 
field names.
-     *
-     * <p>There are two modes for mapping original fields to the fields of the 
{@link Table}:
-     *
-     * <p>1. Reference input fields by name: All fields in the schema 
definition are referenced by
-     * name (and possibly renamed using an alias (as). In this mode, fields 
can be reordered and
-     * projected out. This mode can be used for any input type, including 
POJOs.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * Table table = tableEnv.fromDataSet(
-     *    set,
-     *    $("f1"), // reorder and use the original field
-     *    $("f0").as("name") // reorder and give the original field a better 
name
-     * );
-     * }</pre>
-     *
-     * <p>2. Reference input fields by position: In this mode, fields are 
simply renamed. This mode
-     * can only be used if the input type has a defined field order (tuple, 
case class, Row) and
-     * none of the {@code fields} references a field of the input type.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * Table table = tableEnv.fromDataSet(
-     *    set,
-     *    $("a"), // renames the first field to 'a'
-     *    $("b") // renames the second field to 'b'
-     * );
-     * }</pre>
-     *
-     * @param dataSet The {@link DataSet} to be converted.
-     * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-     *     the {@link Table}.
-     * @param <T> The type of the {@link DataSet}.
-     * @return The converted {@link Table}.
-     */
-    <T> Table fromDataSet(DataSet<T> dataSet, Expression... fields);
-
-    /**
-     * Creates a view from the given {@link DataSet}. Registered views can be 
referenced in SQL
-     * queries.
-     *
-     * <p>The field names of the {@link Table} are automatically derived from 
the type of the {@link
-     * DataSet}.
-     *
-     * <p>The view is registered in the namespace of the current catalog and 
database. To register
-     * the view in a different catalog use {@link #createTemporaryView(String, 
DataSet)}.
-     *
-     * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
-     * it will be inaccessible in the current session. To make the permanent 
object available again
-     * you can drop the corresponding temporary object.
-     *
-     * @param name The name under which the {@link DataSet} is registered in 
the catalog.
-     * @param dataSet The {@link DataSet} to register.
-     * @param <T> The type of the {@link DataSet} to register.
-     * @deprecated use {@link #createTemporaryView(String, DataSet)}
-     */
-    @Deprecated
-    <T> void registerDataSet(String name, DataSet<T> dataSet);
-
-    /**
-     * Creates a view from the given {@link DataSet} in a given path. 
Registered views can be
-     * referenced in SQL queries.
-     *
-     * <p>The field names of the {@link Table} are automatically derived from 
the type of the {@link
-     * DataSet}.
-     *
-     * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
-     * it will be inaccessible in the current session. To make the permanent 
object available again
-     * you can drop the corresponding temporary object.
-     *
-     * @param path The path under which the view is created. See also the 
{@link TableEnvironment}
-     *     class description for the format of the path.
-     * @param dataSet The {@link DataSet} out of which to create the view.
-     * @param <T> The type of the {@link DataSet}.
-     */
-    <T> void createTemporaryView(String path, DataSet<T> dataSet);
-
-    /**
-     * Creates a view from the given {@link DataSet} in a given path with 
specified field names.
-     * Registered views can be referenced in SQL queries.
-     *
-     * <p>There are two modes for mapping original fields to the fields of the 
View:
-     *
-     * <p>1. Reference input fields by name: All fields in the schema 
definition are referenced by
-     * name (and possibly renamed using an alias (as). In this mode, fields 
can be reordered and
-     * projected out. This mode can be used for any input type, including 
POJOs.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // use the original 'f0' field and give a better name to the 'f1' field
-     * tableEnv.registerDataSet("myTable", set, "f0, f1 as name");
-     * }</pre>
-     *
-     * <p>2. Reference input fields by position: In this mode, fields are 
simply renamed. This mode
-     * can only be used if the input type has a defined field order (tuple, 
case class, Row) and
-     * none of the {@code fields} references a field of the input type.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // renames the original fields as 'a' and 'b'
-     * tableEnv.registerDataSet("myTable", set, "a, b");
-     * }</pre>
-     *
-     * <p>The view is registered in the namespace of the current catalog and 
database. To register
-     * the view in a different catalog use {@link #createTemporaryView(String, 
DataSet)}.
-     *
-     * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
-     * it will be inaccessible in the current session. To make the permanent 
object available again
-     * you can drop the corresponding temporary object.
-     *
-     * @param name The name under which the {@link DataSet} is registered in 
the catalog.
-     * @param dataSet The {@link DataSet} to register.
-     * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-     *     the View.
-     * @param <T> The type of the {@link DataSet} to register.
-     * @deprecated use {@link #createTemporaryView(String, DataSet, String)}
-     */
-    @Deprecated
-    <T> void registerDataSet(String name, DataSet<T> dataSet, String fields);
-
-    /**
-     * Creates a view from the given {@link DataSet} in a given path with 
specified field names.
-     * Registered views can be referenced in SQL queries.
-     *
-     * <p>There are two modes for mapping original fields to the fields of the 
View:
-     *
-     * <p>1. Reference input fields by name: All fields in the schema 
definition are referenced by
-     * name (and possibly renamed using an alias (as). In this mode, fields 
can be reordered and
-     * projected out. This mode can be used for any input type, including 
POJOs.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // use the original 'f0' field and give a better name to the 'f1' field
-     * tableEnv.createTemporaryView("cat.db.myTable", set, "f0, f1 as name");
-     * }</pre>
-     *
-     * <p>2. Reference input fields by position: In this mode, fields are 
simply renamed. This mode
-     * can only be used if the input type has a defined field order (tuple, 
case class, Row) and
-     * none of the {@code fields} references a field of the input type.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * // renames the original fields as 'a' and 'b'
-     * tableEnv.createTemporaryView("cat.db.myTable", set, "a, b");
-     * }</pre>
-     *
-     * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
-     * it will be inaccessible in the current session. To make the permanent 
object available again
-     * you can drop the corresponding temporary object.
-     *
-     * @param path The path under which the view is created. See also the 
{@link TableEnvironment}
-     *     class description for the format of the path.
-     * @param dataSet The {@link DataSet} out of which to create the view.
-     * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-     *     the View.
-     * @param <T> The type of the {@link DataSet}.
-     * @deprecated use {@link #createTemporaryView(String, DataSet, 
Expression...)}
-     */
-    @Deprecated
-    <T> void createTemporaryView(String path, DataSet<T> dataSet, String 
fields);
-
-    /**
-     * Creates a view from the given {@link DataSet} in a given path with 
specified field names.
-     * Registered views can be referenced in SQL queries.
-     *
-     * <p>There are two modes for mapping original fields to the fields of the 
View:
-     *
-     * <p>1. Reference input fields by name: All fields in the schema 
definition are referenced by
-     * name (and possibly renamed using an alias (as). In this mode, fields 
can be reordered and
-     * projected out. This mode can be used for any input type, including 
POJOs.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * tableEnv.createTemporaryView(
-     *    "cat.db.myTable",
-     *    set,
-     *    $("f1"), // reorder and use the original field
-     *    $("f0").as("name") // reorder and give the original field a better 
name
-     * );
-     * }</pre>
-     *
-     * <p>2. Reference input fields by position: In this mode, fields are 
simply renamed. This mode
-     * can only be used if the input type has a defined field order (tuple, 
case class, Row) and
-     * none of the {@code fields} references a field of the input type.
-     *
-     * <p>Example:
-     *
-     * <pre>{@code
-     * DataSet<Tuple2<String, Long>> set = ...
-     * tableEnv.createTemporaryView(
-     *    "cat.db.myTable",
-     *    set,
-     *    $("a"), // renames the first field to 'a'
-     *    $("b") // renames the second field to 'b'
-     * );
-     * }</pre>
-     *
-     * <p>Temporary objects can shadow permanent ones. If a permanent object 
in a given path exists,
-     * it will be inaccessible in the current session. To make the permanent 
object available again
-     * you can drop the corresponding temporary object.
-     *
-     * @param path The path under which the view is created. See also the 
{@link TableEnvironment}
-     *     class description for the format of the path.
-     * @param dataSet The {@link DataSet} out of which to create the view.
-     * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-     *     the View.
-     * @param <T> The type of the {@link DataSet}.
-     */
-    <T> void createTemporaryView(String path, DataSet<T> dataSet, 
Expression... fields);
-
-    /**
-     * Converts the given {@link Table} into a {@link DataSet} of a specified 
type.
-     *
-     * <p>The fields of the {@link Table} are mapped to {@link DataSet} fields 
as follows:
-     *
-     * <ul>
-     *   <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
-     *       types: Fields are mapped by position, field types must match.
-     *   <li>POJO {@link DataSet} types: Fields are mapped by field name, 
field types must match.
-     * </ul>
-     *
-     * @param table The {@link Table} to convert.
-     * @param clazz The class of the type of the resulting {@link DataSet}.
-     * @param <T> The type of the resulting {@link DataSet}.
-     * @return The converted {@link DataSet}.
-     */
-    <T> DataSet<T> toDataSet(Table table, Class<T> clazz);
-
-    /**
-     * Converts the given {@link Table} into a {@link DataSet} of a specified 
type.
-     *
-     * <p>The fields of the {@link Table} are mapped to {@link DataSet} fields 
as follows:
-     *
-     * <ul>
-     *   <li>{@link org.apache.flink.types.Row} and {@link 
org.apache.flink.api.java.tuple.Tuple}
-     *       types: Fields are mapped by position, field types must match.
-     *   <li>POJO {@link DataSet} types: Fields are mapped by field name, 
field types must match.
-     * </ul>
-     *
-     * @param table The {@link Table} to convert.
-     * @param typeInfo The {@link TypeInformation} that specifies the type of 
the resulting {@link
-     *     DataSet}.
-     * @param <T> The type of the resulting {@link DataSet}.
-     * @return The converted {@link DataSet}.
-     */
-    <T> DataSet<T> toDataSet(Table table, TypeInformation<T> typeInfo);
-
-    /**
-     * Creates a temporary table from a descriptor.
-     *
-     * <p>Descriptors allow for declaring the communication to external 
systems in an
-     * implementation-agnostic way. The classpath is scanned for suitable 
table factories that match
-     * the desired configuration.
-     *
-     * <p>The following example shows how to read from a connector using a 
JSON format and
-     * registering a temporary table as "MyTable":
-     *
-     * <pre>{@code
-     * tableEnv
-     *   .connect(
-     *     new ExternalSystemXYZ()
-     *       .version("0.11"))
-     *   .withFormat(
-     *     new Json()
-     *       .jsonSchema("{...}")
-     *       .failOnMissingField(false))
-     *   .withSchema(
-     *     new Schema()
-     *       .field("user-name", "VARCHAR").from("u_name")
-     *       .field("count", "DECIMAL")
-     *   .createTemporaryTable("MyTable")
-     * }</pre>
-     *
-     * @param connectorDescriptor connector descriptor describing the external 
system
-     * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part 
of the API. This method
-     *     might be refactored in the next versions. Please use {@link 
#executeSql(String)
-     *     executeSql(ddl)} to register a table instead.
-     */
-    @Override
-    @Deprecated
-    BatchTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
-
-    /**
-     * Returns a {@link TableEnvironment} for a Java batch {@link 
ExecutionEnvironment} that works
-     * with {@link DataSet}s.
-     *
-     * <p>A TableEnvironment can be used to:
-     *
-     * <ul>
-     *   <li>convert a {@link DataSet} to a {@link Table}
-     *   <li>register a {@link DataSet} in the {@link TableEnvironment}'s 
catalog
-     *   <li>register a {@link Table} in the {@link TableEnvironment}'s catalog
-     *   <li>scan a registered table to obtain a {@link Table}
-     *   <li>specify a SQL query on registered tables to obtain a {@link Table}
-     *   <li>convert a {@link Table} into a {@link DataSet}
-     *   <li>explain the AST and execution plan of a {@link Table}
-     * </ul>
-     *
-     * @param executionEnvironment The Java batch {@link ExecutionEnvironment} 
of the
-     *     TableEnvironment.
-     */
-    static BatchTableEnvironment create(ExecutionEnvironment 
executionEnvironment) {
-        Configuration configuration = new Configuration();
-        configuration.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH);
-        TableConfig config = new TableConfig();
-        config.addConfiguration(configuration);
-        return create(executionEnvironment, config);
-    }
-
-    /**
-     * Returns a {@link TableEnvironment} for a Java batch {@link 
ExecutionEnvironment} that works
-     * with {@link DataSet}s.
-     *
-     * <p>A TableEnvironment can be used to:
-     *
-     * <ul>
-     *   <li>convert a {@link DataSet} to a {@link Table}
-     *   <li>register a {@link DataSet} in the {@link TableEnvironment}'s 
catalog
-     *   <li>register a {@link Table} in the {@link TableEnvironment}'s catalog
-     *   <li>scan a registered table to obtain a {@link Table}
-     *   <li>specify a SQL query on registered tables to obtain a {@link Table}
-     *   <li>convert a {@link Table} into a {@link DataSet}
-     *   <li>explain the AST and execution plan of a {@link Table}
-     * </ul>
-     *
-     * @param executionEnvironment The Java batch {@link ExecutionEnvironment} 
of the
-     *     TableEnvironment.
-     * @param tableConfig The configuration of the TableEnvironment.
-     */
-    static BatchTableEnvironment create(
-            ExecutionEnvironment executionEnvironment, TableConfig 
tableConfig) {
-        try {
-            // temporary solution until FLINK-15635 is fixed
-            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
-
-            ModuleManager moduleManager = new ModuleManager();
-
-            String defaultCatalog = "default_catalog";
-            CatalogManager catalogManager =
-                    CatalogManager.newBuilder()
-                            .classLoader(classLoader)
-                            .config(tableConfig.getConfiguration())
-                            .defaultCatalog(
-                                    defaultCatalog,
-                                    new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
-                            .executionConfig(executionEnvironment.getConfig())
-                            .build();
-
-            Class<?> clazz =
-                    Class.forName(
-                            
"org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl");
-            Constructor<?> con =
-                    clazz.getConstructor(
-                            ExecutionEnvironment.class,
-                            TableConfig.class,
-                            CatalogManager.class,
-                            ModuleManager.class);
-            return (BatchTableEnvironment)
-                    con.newInstance(
-                            executionEnvironment, tableConfig, catalogManager, 
moduleManager);
-        } catch (Throwable t) {
-            throw new TableException("Create BatchTableEnvironment failed.", 
t);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java
deleted file mode 100644
index edd93e9..0000000
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.operations;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.ResolvedSchema;
-
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Describes a relational operation that reads from a {@link DataSet}.
- *
- * <p>This operation may expose only part, or change the order of the fields 
available in a {@link
- * org.apache.flink.api.common.typeutils.CompositeType} of the underlying 
{@link DataSet}. The
- * {@link DataSetQueryOperation#getFieldIndices()} describes the mapping 
between fields of the
- * {@link TableSchema} to the {@link 
org.apache.flink.api.common.typeutils.CompositeType}.
- */
-@Internal
-public class DataSetQueryOperation<E> implements QueryOperation {
-
-    private final DataSet<E> dataSet;
-    private final int[] fieldIndices;
-    private final ResolvedSchema resolvedSchema;
-
-    public DataSetQueryOperation(
-            DataSet<E> dataSet, int[] fieldIndices, ResolvedSchema 
resolvedSchema) {
-        this.dataSet = dataSet;
-        this.resolvedSchema = resolvedSchema;
-        this.fieldIndices = fieldIndices;
-    }
-
-    public DataSet<E> getDataSet() {
-        return dataSet;
-    }
-
-    public int[] getFieldIndices() {
-        return fieldIndices;
-    }
-
-    @Override
-    public ResolvedSchema getResolvedSchema() {
-        return resolvedSchema;
-    }
-
-    @Override
-    public String asSummaryString() {
-        Map<String, Object> args = new LinkedHashMap<>();
-        args.put("fields", resolvedSchema.getColumnNames());
-
-        return OperationUtils.formatWithChildren(
-                "DataSet", args, getChildren(), Operation::asSummaryString);
-    }
-
-    @Override
-    public List<QueryOperation> getChildren() {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public <T> T accept(QueryOperationVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
deleted file mode 100644
index cd4e2b8..0000000
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.descriptors;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.api.internal.Registration;
-
-/**
- * Describes a table connected from a batch environment.
- *
- * <p>This class just exists for backwards compatibility use {@link 
ConnectTableDescriptor} for
- * declarations.
- */
-@PublicEvolving
-public final class BatchTableDescriptor extends ConnectTableDescriptor {
-
-    public BatchTableDescriptor(
-            Registration registration, ConnectorDescriptor 
connectorDescriptor) {
-        super(registration, connectorDescriptor);
-    }
-
-    @Override
-    public BatchTableDescriptor withSchema(Schema schema) {
-        return (BatchTableDescriptor) super.withSchema(schema);
-    }
-}
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
deleted file mode 100644
index 0aedf4b..0000000
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.bridge.scala
-
-import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE
-import org.apache.flink.table.api.{TableEnvironment, _}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
-import org.apache.flink.table.descriptors.{BatchTableDescriptor, 
ConnectorDescriptor}
-import org.apache.flink.table.expressions.Expression
-import org.apache.flink.table.functions.{AggregateFunction, TableFunction}
-import org.apache.flink.table.module.ModuleManager
-
-/**
-  * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that 
works
-  * with [[DataSet]]s.
-  *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataSet]] to a [[Table]]
-  * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataSet]]
-  * - explain the AST and execution plan of a [[Table]]
-  *
-  * @deprecated [[BatchTableEnvironment]] will be dropped in Flink 1.14 
because it only supports
-  *              the old planner. Use the unified [[TableEnvironment]] 
instead, which supports both
-  *              batch and streaming. More advanced operations previously 
covered by the DataSet API
-  *              can now use the DataStream API in BATCH execution mode.
-  */
-@deprecated
-trait BatchTableEnvironment extends TableEnvironment {
-
-  /**
-    * Registers a [[TableFunction]] under a unique name in the 
TableEnvironment's catalog.
-    * Registered functions can be referenced in Table API and SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param tf The TableFunction to register.
-    * @tparam T The type of the output row.
-    */
-  def registerFunction[T: TypeInformation](name: String, tf: 
TableFunction[T]): Unit
-
-  /**
-    * Registers an [[AggregateFunction]] under a unique name in the 
TableEnvironment's catalog.
-    * Registered functions can be referenced in Table API and SQL queries.
-    *
-    * @param name The name under which the function is registered.
-    * @param f The AggregateFunction to register.
-    * @tparam T The type of the output value.
-    * @tparam ACC The type of aggregate accumulator.
-    */
-  def registerFunction[T: TypeInformation, ACC: TypeInformation](
-    name: String,
-    f: AggregateFunction[T, ACC]): Unit
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]].
-    *
-    * The field names of the [[Table]] are automatically derived from the type 
of the [[DataSet]].
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T]): Table
-
-  /**
-    * Converts the given [[DataSet]] into a [[Table]] with specified field 
names.
-    *
-    * There are two modes for mapping original fields to the fields of the 
[[Table]]:
-    *
-    * 1. Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be 
reordered and
-    * projected out. This mode can be used for any input type, including POJOs.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   val table: Table = tableEnv.fromDataSet(
-    *      set,
-    *      $"_2", // reorder and use the original field
-    *      $"_1" as "name" // reorder and give the original field a better name
-    *   )
-    * }}}
-    *
-    * 2. Reference input fields by position:
-    * In this mode, fields are simply renamed. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, 
Row) and none of
-    * the `fields` references a field of the input type.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   val table: Table = tableEnv.fromDataSet(
-    *      set,
-    *      $"a", // renames the first field to 'a'
-    *      $"b" // renames the second field to 'b'
-    *   )
-    * }}}
-    *
-    * @param dataSet The [[DataSet]] to be converted.
-    * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-    *               the [[Table]].
-    * @tparam T The type of the [[DataSet]].
-    * @return The converted [[Table]].
-    */
-  def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table
-
-  /**
-    * Creates a view from the given [[DataSet]].
-    * Registered views can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived from the type 
of the [[DataSet]].
-    *
-    * The view is registered in the namespace of the current catalog and 
database. To register the
-    * view in a different catalog use [[createTemporaryView]].
-    *
-    * Temporary objects can shadow permanent ones. If a permanent object in a 
given path exists,
-    * it will be inaccessible in the current session. To make the permanent 
object available again
-    * you can drop the corresponding temporary object.
-    *
-    * @param name The name under which the [[DataSet]] is registered in the 
catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @tparam T The type of the [[DataSet]] to register.
-    * @deprecated use [[createTemporaryView]]
-    */
-  @deprecated
-  def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit
-
-  /**
-    * Creates a view from the given [[DataSet]] in a given path.
-    * Registered tables can be referenced in SQL queries.
-    *
-    * The field names of the [[Table]] are automatically derived
-    * from the type of the [[DataSet]].
-    *
-    * Temporary objects can shadow permanent ones. If a permanent object in a 
given path exists,
-    * it will be inaccessible in the current session. To make the permanent 
object available again
-    * you can drop the corresponding temporary object.
-    *
-    * @param path The path under which the [[DataSet]] is created.
-    *             See also the [[TableEnvironment]] class description for the 
format of the path.
-    * @param dataSet The [[DataSet]] out of which to create the view.
-    * @tparam T The type of the [[DataSet]].
-    */
-  def createTemporaryView[T](path: String, dataSet: DataSet[T]): Unit
-
-  /**
-    * Creates a view from the given [[DataSet]] in a given path with specified 
field names.
-    * Registered views can be referenced in SQL queries.
-    *
-    * There are two modes for mapping original fields to the fields of the 
View:
-    *
-    * 1. Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be 
reordered and
-    * projected out. This mode can be used for any input type, including POJOs.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   tableEnv.registerDataSet(
-    *      "myTable",
-    *      set,
-    *      $"_2", // reorder and use the original field
-    *      $"_1" as "name" // reorder and give the original field a better name
-    *   );
-    * }}}
-    *
-    * 2. Reference input fields by position:
-    * In this mode, fields are simply renamed. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, 
Row) and none of
-    * the `fields` references a field of the input type.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   tableEnv.registerDataSet(
-    *      "myTable",
-    *      set,
-    *      $"a", // renames the first field to 'a'
-    *      $"b" // renames the second field to 'b'
-    *   )
-    * }}}
-    *
-    * The view is registered in the namespace of the current catalog and 
database. To register the
-    * view in a different catalog use [[createTemporaryView]].
-    *
-    * Temporary objects can shadow permanent ones. If a permanent object in a 
given path exists,
-    * it will be inaccessible in the current session. To make the permanent 
object available again
-    * you can drop the corresponding temporary object.
-    *
-    * @param name The name under which the [[DataSet]] is registered in the 
catalog.
-    * @param dataSet The [[DataSet]] to register.
-    * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-    *               the View.
-    * @tparam T The type of the [[DataSet]] to register.
-    * @deprecated use [[createTemporaryView]]
-    */
-  @deprecated
-  def registerDataSet[T](name: String, dataSet: DataSet[T], fields: 
Expression*): Unit
-
-  /**
-    * Creates a view from the given [[DataSet]] in a given path with specified 
field names.
-    * Registered views can be referenced in SQL queries.
-    *
-    * There are two modes for mapping original fields to the fields of the 
View:
-    *
-    * 1. Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be 
reordered and
-    * projected out. This mode can be used for any input type, including POJOs.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   tableEnv.createTemporaryView(
-    *      "cat.db.myTable",
-    *      set,
-    *      $"_2", // reorder and use the original field
-    *      $"_1" as "name" // reorder and give the original field a better name
-    *   )
-    * }}}
-    *
-    * 2. Reference input fields by position:
-    * In this mode, fields are simply renamed. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, 
Row) and none of
-    * the `fields` references a field of the input type.
-    *
-    * Example:
-    *
-    * {{{
-    *   val set: DataSet[(String, Long)] = ...
-    *   tableEnv.createTemporaryView(
-    *      "cat.db.myTable",
-    *      set,
-    *      $"a", // renames the first field to 'a'
-    *      $"b" // renames the second field to 'b'
-    *   )
-    * }}}
-    *
-    * Temporary objects can shadow permanent ones. If a permanent object in a 
given path exists,
-    * it will be inaccessible in the current session. To make the permanent 
object available again
-    * you can drop the corresponding temporary object.
-    *
-    * @param path The path under which the [[DataSet]] is created.
-    *             See also the [[TableEnvironment]] class description for the 
format of the
-    *             path.
-    * @param dataSet The [[DataSet]] out of which to create the view.
-    * @param fields The fields expressions to map original fields of the 
DataSet to the fields of
-    *               the View.
-    * @tparam T The type of the [[DataSet]].
-    */
-  def createTemporaryView[T](path: String, dataSet: DataSet[T], fields: 
Expression*): Unit
-
-  /**
-    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
-    *
-    * @param table The [[Table]] to convert.
-    * @tparam T The type of the resulting [[DataSet]].
-    * @return The converted [[DataSet]].
-    */
-  def toDataSet[T: TypeInformation](table: Table): DataSet[T]
-
-  /**
-    * Triggers the program execution. The environment will execute all parts of
-    * the program.
-    *
-    * The program execution will be logged and displayed with the provided name
-    *
-    * It calls the ExecutionEnvironment#execute on the underlying
-    * [[ExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this
-    * environment translates queries eagerly.
-    *
-    * @param jobName Desired name of the job
-    * @return The result of the job execution, containing elapsed time and 
accumulators.
-    * @throws Exception which occurs during job execution.
-    */
-  @throws[Exception]
-  override def execute(jobName: String): JobExecutionResult
-
-  /**
-   * Creates a temporary table from a descriptor.
-   *
-   * Descriptors allow for declaring the communication to external systems in 
an
-   * implementation-agnostic way. The classpath is scanned for suitable table 
factories that match
-   * the desired configuration.
-   *
-   * The following example shows how to read from a connector using a JSON 
format and
-   * registering a temporary table as "MyTable":
-   *
-   * {{{
-   *
-   * tableEnv
-   *   .connect(
-   *     new ExternalSystemXYZ()
-   *       .version("0.11"))
-   *   .withFormat(
-   *     new Json()
-   *       .jsonSchema("{...}")
-   *       .failOnMissingField(false))
-   *   .withSchema(
-   *     new Schema()
-   *       .field("user-name", "VARCHAR").from("u_name")
-   *       .field("count", "DECIMAL")
-   *   .createTemporaryTable("MyTable")
-   * }}}
-   *
-   * @param connectorDescriptor connector descriptor describing the external 
system
-   * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the 
API.
-   *             This method might be refactored in the next versions.
-   *             Please use [[executeSql]] to register a table instead.
-   */
-  @deprecated
-  override def connect(connectorDescriptor: ConnectorDescriptor): 
BatchTableDescriptor
-}
-
-/**
- * @deprecated [[BatchTableEnvironment]] will be dropped in Flink 1.14 because 
it only supports
- *              the old planner. Use the unified [[TableEnvironment]] instead, 
which supports both
- *              batch and streaming. More advanced operations previously 
covered by the DataSet API
- *              can now use the DataStream API in BATCH execution mode.
- */
-@deprecated
-object BatchTableEnvironment {
-
-  /**
-    * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that 
works
-    * with [[DataSet]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataSet]] to a [[Table]]
-    * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataSet]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala batch [[ExecutionEnvironment]] of 
the TableEnvironment.
-    */
-  def create(executionEnvironment: ExecutionEnvironment): 
BatchTableEnvironment = {
-    val configuration = new Configuration
-    configuration.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH)
-    val config = new TableConfig();
-    config.addConfiguration(configuration)
-    create(executionEnvironment, config)
-  }
-
-  /**
-    * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that 
works
-    * with [[DataSet]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataSet]] to a [[Table]]
-    * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataSet]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala batch [[ExecutionEnvironment]] of 
the TableEnvironment.
-    * @param tableConfig The configuration of the TableEnvironment.
-    */
-  def create(executionEnvironment: ExecutionEnvironment, tableConfig: 
TableConfig)
-  : BatchTableEnvironment = {
-    try {
-      // temporary solution until FLINK-15635 is fixed
-      val classLoader = Thread.currentThread.getContextClassLoader
-
-      val moduleManager = new ModuleManager
-
-      val defaultCatalog = "default_catalog"
-      val catalogManager = CatalogManager.newBuilder
-        .classLoader(classLoader)
-        .config(tableConfig.getConfiguration)
-        .defaultCatalog(
-          defaultCatalog,
-          new GenericInMemoryCatalog(defaultCatalog, "default_database"))
-        .executionConfig(executionEnvironment.getConfig)
-        .build
-
-      val clazz = Class
-        
.forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl")
-      val con = clazz
-        .getConstructor(
-          classOf[ExecutionEnvironment],
-          classOf[TableConfig],
-          classOf[CatalogManager],
-          classOf[ModuleManager])
-      con.newInstance(executionEnvironment, tableConfig, catalogManager, 
moduleManager)
-        .asInstanceOf[BatchTableEnvironment]
-    } catch {
-      case t: Throwable => throw new TableException("Create 
BatchTableEnvironment failed.", t)
-    }
-  }
-}
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala
deleted file mode 100644
index c51e367..0000000
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.bridge.scala
-
-import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Table
-import org.apache.flink.table.expressions.Expression
-
-/**
-  * Holds methods to convert a [[DataSet]] into a [[Table]].
-  *
-  * @param dataSet The [[DataSet]] to convert.
-  * @param inputType The [[TypeInformation]] for the type of the [[DataSet]].
-  * @tparam T The type of the [[DataSet]].
-  */
-@PublicEvolving
-class DataSetConversions[T](dataSet: DataSet[T], inputType: 
TypeInformation[T]) {
-
-  /**
-    * Converts the [[DataSet]] into a [[Table]].
-    *
-    * The field names of the new [[Table]] can be specified like this:
-    *
-    * {{{
-    *   val env = ExecutionEnvironment.getExecutionEnvironment
-    *   val tEnv = BatchTableEnvironment.create(env)
-    *
-    *   val set: DataSet[(String, Int)] = ...
-    *   val table = set.toTable(tEnv, 'name, 'amount)
-    * }}}
-    *
-    * If not explicitly specified, field names are automatically extracted 
from the type of
-    * the [[DataSet]].
-    *
-    * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] 
is created.
-    * @param fields The field names of the new [[Table]] (optional).
-    * @return The resulting [[Table]].
-    */
-  def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = {
-    if (fields.isEmpty) {
-      tableEnv.fromDataSet(dataSet)
-    } else {
-      tableEnv.fromDataSet(dataSet, fields: _*)
-    }
-  }
-
-}
-
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala
index 592b363..f8ee450 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.api.{Table, TableException, ValidationException}
 
 /**
-  * Holds methods to convert a [[Table]] into a [[DataSet]] or a 
[[DataStream]].
+  * Holds methods to convert a [[Table]] into a [[DataStream]].
   *
   * @param table The table to convert.
   */
@@ -36,28 +36,6 @@ class TableConversions(table: Table) {
   private val internalTable = table.asInstanceOf[TableImpl]
 
   /**
-    * Converts the given [[Table]] into a [[DataSet]] of a specified type.
-    *
-    * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows:
-    * - [[org.apache.flink.types.Row]] and 
[[org.apache.flink.api.java.tuple.Tuple]]
-    * types: Fields are mapped by position, field types must match.
-    * - POJO [[DataSet]] types: Fields are mapped by field name, field types 
must match.
-    *
-    * @tparam T The type of the resulting [[DataSet]].
-    * @return The converted [[DataSet]].
-    */
-  def toDataSet[T: TypeInformation]: DataSet[T] = {
-
-    internalTable.getTableEnvironment match {
-      case tEnv: BatchTableEnvironment =>
-        tEnv.toDataSet(table)
-      case _ =>
-        throw new ValidationException(
-          "Only tables that originate from Scala DataSets can be converted to 
Scala DataSets.")
-    }
-  }
-
-  /**
     * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
     *
     * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala
index 6e39054..ec2c6da 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.api.bridge
 
-import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.api.{ImplicitExpressionConversions, 
ImplicitExpressionOperations, Table, ValidationException}
@@ -56,23 +56,10 @@ package object scala {
     new TableConversions(table.asInstanceOf[TableImpl])
   }
 
-  implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = 
{
-    new DataSetConversions[T](set, set.getType())
-  }
-
   implicit def dataStreamConversions[T](set: DataStream[T]): 
DataStreamConversions[T] = {
     new DataStreamConversions[T](set, set.dataType)
   }
 
-  implicit def table2RowDataSet(table: Table): DataSet[Row] = {
-    val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
-    if (!tableEnv.isInstanceOf[BatchTableEnvironment]) {
-      throw new ValidationException("Table cannot be converted into a DataSet. 
" +
-        "It is not part of a batch table environment.")
-    }
-    tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
-  }
-
   implicit def table2RowDataStream(table: Table): DataStream[Row] = {
     val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment
     if (!tableEnv.isInstanceOf[StreamTableEnvironment]) {

Reply via email to