This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e46999388d844939bbc9e66254f53765722dd358 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Jun 4 09:48:43 2021 +0200 [FLINK-22877][table] Remove BatchTableEnvironment and related classes --- flink-python/pyflink/table/descriptors.py | 13 - .../api/bridge/java/BatchTableEnvironment.java | 521 --------------------- .../table/operations/DataSetQueryOperation.java | 84 ---- .../table/descriptors/BatchTableDescriptor.java | 42 -- .../api/bridge/scala/BatchTableEnvironment.scala | 429 ----------------- .../api/bridge/scala/DataSetConversions.scala | 65 --- .../table/api/bridge/scala/TableConversions.scala | 24 +- .../flink/table/api/bridge/scala/package.scala | 15 +- 8 files changed, 2 insertions(+), 1191 deletions(-) diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py index 22c1ccb..dd647af 100644 --- a/flink-python/pyflink/table/descriptors.py +++ b/flink-python/pyflink/table/descriptors.py @@ -38,7 +38,6 @@ __all__ = [ 'Json', 'ConnectTableDescriptor', 'StreamTableDescriptor', - 'BatchTableDescriptor', 'CustomConnectorDescriptor', 'CustomFormatDescriptor' ] @@ -1714,15 +1713,3 @@ class StreamTableDescriptor(ConnectTableDescriptor): """ self._j_stream_table_descriptor = self._j_stream_table_descriptor.inUpsertMode() return self - - -class BatchTableDescriptor(ConnectTableDescriptor): - """ - Descriptor for specifying a table source and/or sink in a batch environment. - - .. seealso:: parent class: :class:`ConnectTableDescriptor` - """ - - def __init__(self, j_batch_table_descriptor): - self._j_batch_table_descriptor = j_batch_table_descriptor - super(BatchTableDescriptor, self).__init__(self._j_batch_table_descriptor) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java deleted file mode 100644 index 04dff4a..0000000 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.java +++ /dev/null @@ -1,521 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.api.bridge.java; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; -import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.descriptors.BatchTableDescriptor; -import org.apache.flink.table.descriptors.ConnectorDescriptor; -import org.apache.flink.table.expressions.Expression; -import org.apache.flink.table.functions.AggregateFunction; -import org.apache.flink.table.functions.TableFunction; -import org.apache.flink.table.module.ModuleManager; - -import java.lang.reflect.Constructor; - -import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; - -/** - * The {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works with {@link - * DataSet}s. - * - * <p>A TableEnvironment can be used to: - * - * <ul> - * <li>convert a {@link DataSet} to a {@link Table} - * <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog - * <li>scan a registered table to obtain a {@link Table} - * <li>specify a SQL query on registered tables to obtain a {@link Table} - * <li>convert a {@link Table} into a {@link DataSet} - * <li>explain the AST and execution plan of a {@link Table} - * </ul> - * - * @deprecated {@link BatchTableEnvironment} will be dropped in Flink 1.14 because it only supports - * the old planner. Use the unified {@link TableEnvironment} instead, which supports both batch - * and streaming. More advanced operations previously covered by the DataSet API can now use the - * DataStream API in BATCH execution mode. - */ -@Deprecated -@PublicEvolving -public interface BatchTableEnvironment extends TableEnvironment { - - /** - * Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param tableFunction The TableFunction to register. - * @param <T> The type of the output row. - */ - <T> void registerFunction(String name, TableFunction<T> tableFunction); - - /** - * Registers an {@link AggregateFunction} under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param aggregateFunction The AggregateFunction to register. - * @param <T> The type of the output value. - * @param <ACC> The type of aggregate accumulator. - */ - <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction); - - /** - * Converts the given {@link DataSet} into a {@link Table}. - * - * <p>The field names of the {@link Table} are automatically derived from the type of the {@link - * DataSet}. - * - * @param dataSet The {@link DataSet} to be converted. - * @param <T> The type of the {@link DataSet}. - * @return The converted {@link Table}. - */ - <T> Table fromDataSet(DataSet<T> dataSet); - - /** - * Converts the given {@link DataSet} into a {@link Table} with specified field names. - * - * <p>There are two modes for mapping original fields to the fields of the {@link Table}: - * - * <p>1. Reference input fields by name: All fields in the schema definition are referenced by - * name (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // use the original 'f0' field and give a better name to the 'f1' field - * Table table = tableEnv.fromTable(set, "f0, f1 as name"); - * }</pre> - * - * <p>2. Reference input fields by position: In this mode, fields are simply renamed. This mode - * can only be used if the input type has a defined field order (tuple, case class, Row) and - * none of the {@code fields} references a field of the input type. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // renames the original fields as 'a' and 'b' - * Table table = tableEnv.fromDataSet(set, "a, b"); - * }</pre> - * - * @param dataSet The {@link DataSet} to be converted. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the {@link Table}. - * @param <T> The type of the {@link DataSet}. - * @return The converted {@link Table}. - * @deprecated use {@link #fromDataSet(DataSet, Expression...)} - */ - @Deprecated - <T> Table fromDataSet(DataSet<T> dataSet, String fields); - - /** - * Converts the given {@link DataSet} into a {@link Table} with specified field names. - * - * <p>There are two modes for mapping original fields to the fields of the {@link Table}: - * - * <p>1. Reference input fields by name: All fields in the schema definition are referenced by - * name (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * Table table = tableEnv.fromDataSet( - * set, - * $("f1"), // reorder and use the original field - * $("f0").as("name") // reorder and give the original field a better name - * ); - * }</pre> - * - * <p>2. Reference input fields by position: In this mode, fields are simply renamed. This mode - * can only be used if the input type has a defined field order (tuple, case class, Row) and - * none of the {@code fields} references a field of the input type. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * Table table = tableEnv.fromDataSet( - * set, - * $("a"), // renames the first field to 'a' - * $("b") // renames the second field to 'b' - * ); - * }</pre> - * - * @param dataSet The {@link DataSet} to be converted. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the {@link Table}. - * @param <T> The type of the {@link DataSet}. - * @return The converted {@link Table}. - */ - <T> Table fromDataSet(DataSet<T> dataSet, Expression... fields); - - /** - * Creates a view from the given {@link DataSet}. Registered views can be referenced in SQL - * queries. - * - * <p>The field names of the {@link Table} are automatically derived from the type of the {@link - * DataSet}. - * - * <p>The view is registered in the namespace of the current catalog and database. To register - * the view in a different catalog use {@link #createTemporaryView(String, DataSet)}. - * - * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param name The name under which the {@link DataSet} is registered in the catalog. - * @param dataSet The {@link DataSet} to register. - * @param <T> The type of the {@link DataSet} to register. - * @deprecated use {@link #createTemporaryView(String, DataSet)} - */ - @Deprecated - <T> void registerDataSet(String name, DataSet<T> dataSet); - - /** - * Creates a view from the given {@link DataSet} in a given path. Registered views can be - * referenced in SQL queries. - * - * <p>The field names of the {@link Table} are automatically derived from the type of the {@link - * DataSet}. - * - * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param path The path under which the view is created. See also the {@link TableEnvironment} - * class description for the format of the path. - * @param dataSet The {@link DataSet} out of which to create the view. - * @param <T> The type of the {@link DataSet}. - */ - <T> void createTemporaryView(String path, DataSet<T> dataSet); - - /** - * Creates a view from the given {@link DataSet} in a given path with specified field names. - * Registered views can be referenced in SQL queries. - * - * <p>There are two modes for mapping original fields to the fields of the View: - * - * <p>1. Reference input fields by name: All fields in the schema definition are referenced by - * name (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // use the original 'f0' field and give a better name to the 'f1' field - * tableEnv.registerDataSet("myTable", set, "f0, f1 as name"); - * }</pre> - * - * <p>2. Reference input fields by position: In this mode, fields are simply renamed. This mode - * can only be used if the input type has a defined field order (tuple, case class, Row) and - * none of the {@code fields} references a field of the input type. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // renames the original fields as 'a' and 'b' - * tableEnv.registerDataSet("myTable", set, "a, b"); - * }</pre> - * - * <p>The view is registered in the namespace of the current catalog and database. To register - * the view in a different catalog use {@link #createTemporaryView(String, DataSet)}. - * - * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param name The name under which the {@link DataSet} is registered in the catalog. - * @param dataSet The {@link DataSet} to register. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the View. - * @param <T> The type of the {@link DataSet} to register. - * @deprecated use {@link #createTemporaryView(String, DataSet, String)} - */ - @Deprecated - <T> void registerDataSet(String name, DataSet<T> dataSet, String fields); - - /** - * Creates a view from the given {@link DataSet} in a given path with specified field names. - * Registered views can be referenced in SQL queries. - * - * <p>There are two modes for mapping original fields to the fields of the View: - * - * <p>1. Reference input fields by name: All fields in the schema definition are referenced by - * name (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // use the original 'f0' field and give a better name to the 'f1' field - * tableEnv.createTemporaryView("cat.db.myTable", set, "f0, f1 as name"); - * }</pre> - * - * <p>2. Reference input fields by position: In this mode, fields are simply renamed. This mode - * can only be used if the input type has a defined field order (tuple, case class, Row) and - * none of the {@code fields} references a field of the input type. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * // renames the original fields as 'a' and 'b' - * tableEnv.createTemporaryView("cat.db.myTable", set, "a, b"); - * }</pre> - * - * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param path The path under which the view is created. See also the {@link TableEnvironment} - * class description for the format of the path. - * @param dataSet The {@link DataSet} out of which to create the view. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the View. - * @param <T> The type of the {@link DataSet}. - * @deprecated use {@link #createTemporaryView(String, DataSet, Expression...)} - */ - @Deprecated - <T> void createTemporaryView(String path, DataSet<T> dataSet, String fields); - - /** - * Creates a view from the given {@link DataSet} in a given path with specified field names. - * Registered views can be referenced in SQL queries. - * - * <p>There are two modes for mapping original fields to the fields of the View: - * - * <p>1. Reference input fields by name: All fields in the schema definition are referenced by - * name (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * tableEnv.createTemporaryView( - * "cat.db.myTable", - * set, - * $("f1"), // reorder and use the original field - * $("f0").as("name") // reorder and give the original field a better name - * ); - * }</pre> - * - * <p>2. Reference input fields by position: In this mode, fields are simply renamed. This mode - * can only be used if the input type has a defined field order (tuple, case class, Row) and - * none of the {@code fields} references a field of the input type. - * - * <p>Example: - * - * <pre>{@code - * DataSet<Tuple2<String, Long>> set = ... - * tableEnv.createTemporaryView( - * "cat.db.myTable", - * set, - * $("a"), // renames the first field to 'a' - * $("b") // renames the second field to 'b' - * ); - * }</pre> - * - * <p>Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param path The path under which the view is created. See also the {@link TableEnvironment} - * class description for the format of the path. - * @param dataSet The {@link DataSet} out of which to create the view. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the View. - * @param <T> The type of the {@link DataSet}. - */ - <T> void createTemporaryView(String path, DataSet<T> dataSet, Expression... fields); - - /** - * Converts the given {@link Table} into a {@link DataSet} of a specified type. - * - * <p>The fields of the {@link Table} are mapped to {@link DataSet} fields as follows: - * - * <ul> - * <li>{@link org.apache.flink.types.Row} and {@link org.apache.flink.api.java.tuple.Tuple} - * types: Fields are mapped by position, field types must match. - * <li>POJO {@link DataSet} types: Fields are mapped by field name, field types must match. - * </ul> - * - * @param table The {@link Table} to convert. - * @param clazz The class of the type of the resulting {@link DataSet}. - * @param <T> The type of the resulting {@link DataSet}. - * @return The converted {@link DataSet}. - */ - <T> DataSet<T> toDataSet(Table table, Class<T> clazz); - - /** - * Converts the given {@link Table} into a {@link DataSet} of a specified type. - * - * <p>The fields of the {@link Table} are mapped to {@link DataSet} fields as follows: - * - * <ul> - * <li>{@link org.apache.flink.types.Row} and {@link org.apache.flink.api.java.tuple.Tuple} - * types: Fields are mapped by position, field types must match. - * <li>POJO {@link DataSet} types: Fields are mapped by field name, field types must match. - * </ul> - * - * @param table The {@link Table} to convert. - * @param typeInfo The {@link TypeInformation} that specifies the type of the resulting {@link - * DataSet}. - * @param <T> The type of the resulting {@link DataSet}. - * @return The converted {@link DataSet}. - */ - <T> DataSet<T> toDataSet(Table table, TypeInformation<T> typeInfo); - - /** - * Creates a temporary table from a descriptor. - * - * <p>Descriptors allow for declaring the communication to external systems in an - * implementation-agnostic way. The classpath is scanned for suitable table factories that match - * the desired configuration. - * - * <p>The following example shows how to read from a connector using a JSON format and - * registering a temporary table as "MyTable": - * - * <pre>{@code - * tableEnv - * .connect( - * new ExternalSystemXYZ() - * .version("0.11")) - * .withFormat( - * new Json() - * .jsonSchema("{...}") - * .failOnMissingField(false)) - * .withSchema( - * new Schema() - * .field("user-name", "VARCHAR").from("u_name") - * .field("count", "DECIMAL") - * .createTemporaryTable("MyTable") - * }</pre> - * - * @param connectorDescriptor connector descriptor describing the external system - * @deprecated The SQL {@code CREATE TABLE} DDL is richer than this part of the API. This method - * might be refactored in the next versions. Please use {@link #executeSql(String) - * executeSql(ddl)} to register a table instead. - */ - @Override - @Deprecated - BatchTableDescriptor connect(ConnectorDescriptor connectorDescriptor); - - /** - * Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works - * with {@link DataSet}s. - * - * <p>A TableEnvironment can be used to: - * - * <ul> - * <li>convert a {@link DataSet} to a {@link Table} - * <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog - * <li>scan a registered table to obtain a {@link Table} - * <li>specify a SQL query on registered tables to obtain a {@link Table} - * <li>convert a {@link Table} into a {@link DataSet} - * <li>explain the AST and execution plan of a {@link Table} - * </ul> - * - * @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the - * TableEnvironment. - */ - static BatchTableEnvironment create(ExecutionEnvironment executionEnvironment) { - Configuration configuration = new Configuration(); - configuration.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); - TableConfig config = new TableConfig(); - config.addConfiguration(configuration); - return create(executionEnvironment, config); - } - - /** - * Returns a {@link TableEnvironment} for a Java batch {@link ExecutionEnvironment} that works - * with {@link DataSet}s. - * - * <p>A TableEnvironment can be used to: - * - * <ul> - * <li>convert a {@link DataSet} to a {@link Table} - * <li>register a {@link DataSet} in the {@link TableEnvironment}'s catalog - * <li>register a {@link Table} in the {@link TableEnvironment}'s catalog - * <li>scan a registered table to obtain a {@link Table} - * <li>specify a SQL query on registered tables to obtain a {@link Table} - * <li>convert a {@link Table} into a {@link DataSet} - * <li>explain the AST and execution plan of a {@link Table} - * </ul> - * - * @param executionEnvironment The Java batch {@link ExecutionEnvironment} of the - * TableEnvironment. - * @param tableConfig The configuration of the TableEnvironment. - */ - static BatchTableEnvironment create( - ExecutionEnvironment executionEnvironment, TableConfig tableConfig) { - try { - // temporary solution until FLINK-15635 is fixed - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - ModuleManager moduleManager = new ModuleManager(); - - String defaultCatalog = "default_catalog"; - CatalogManager catalogManager = - CatalogManager.newBuilder() - .classLoader(classLoader) - .config(tableConfig.getConfiguration()) - .defaultCatalog( - defaultCatalog, - new GenericInMemoryCatalog(defaultCatalog, "default_database")) - .executionConfig(executionEnvironment.getConfig()) - .build(); - - Class<?> clazz = - Class.forName( - "org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl"); - Constructor<?> con = - clazz.getConstructor( - ExecutionEnvironment.class, - TableConfig.class, - CatalogManager.class, - ModuleManager.class); - return (BatchTableEnvironment) - con.newInstance( - executionEnvironment, tableConfig, catalogManager, moduleManager); - } catch (Throwable t) { - throw new TableException("Create BatchTableEnvironment failed.", t); - } - } -} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java deleted file mode 100644 index edd93e9..0000000 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/operations/DataSetQueryOperation.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.operations; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.catalog.ResolvedSchema; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * Describes a relational operation that reads from a {@link DataSet}. - * - * <p>This operation may expose only part, or change the order of the fields available in a {@link - * org.apache.flink.api.common.typeutils.CompositeType} of the underlying {@link DataSet}. The - * {@link DataSetQueryOperation#getFieldIndices()} describes the mapping between fields of the - * {@link TableSchema} to the {@link org.apache.flink.api.common.typeutils.CompositeType}. - */ -@Internal -public class DataSetQueryOperation<E> implements QueryOperation { - - private final DataSet<E> dataSet; - private final int[] fieldIndices; - private final ResolvedSchema resolvedSchema; - - public DataSetQueryOperation( - DataSet<E> dataSet, int[] fieldIndices, ResolvedSchema resolvedSchema) { - this.dataSet = dataSet; - this.resolvedSchema = resolvedSchema; - this.fieldIndices = fieldIndices; - } - - public DataSet<E> getDataSet() { - return dataSet; - } - - public int[] getFieldIndices() { - return fieldIndices; - } - - @Override - public ResolvedSchema getResolvedSchema() { - return resolvedSchema; - } - - @Override - public String asSummaryString() { - Map<String, Object> args = new LinkedHashMap<>(); - args.put("fields", resolvedSchema.getColumnNames()); - - return OperationUtils.formatWithChildren( - "DataSet", args, getChildren(), Operation::asSummaryString); - } - - @Override - public List<QueryOperation> getChildren() { - return Collections.emptyList(); - } - - @Override - public <T> T accept(QueryOperationVisitor<T> visitor) { - return visitor.visit(this); - } -} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java deleted file mode 100644 index cd4e2b8..0000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/BatchTableDescriptor.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.descriptors; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.api.internal.Registration; - -/** - * Describes a table connected from a batch environment. - * - * <p>This class just exists for backwards compatibility use {@link ConnectTableDescriptor} for - * declarations. - */ -@PublicEvolving -public final class BatchTableDescriptor extends ConnectTableDescriptor { - - public BatchTableDescriptor( - Registration registration, ConnectorDescriptor connectorDescriptor) { - super(registration, connectorDescriptor); - } - - @Override - public BatchTableDescriptor withSchema(Schema schema) { - return (BatchTableDescriptor) super.withSchema(schema); - } -} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala deleted file mode 100644 index 0aedf4b..0000000 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/BatchTableEnvironment.scala +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.bridge.scala - -import org.apache.flink.api.common.{JobExecutionResult, RuntimeExecutionMode} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} -import org.apache.flink.configuration.Configuration -import org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE -import org.apache.flink.table.api.{TableEnvironment, _} -import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog} -import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor} -import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.functions.{AggregateFunction, TableFunction} -import org.apache.flink.table.module.ModuleManager - -/** - * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works - * with [[DataSet]]s. - * - * A TableEnvironment can be used to: - * - convert a [[DataSet]] to a [[Table]] - * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataSet]] - * - explain the AST and execution plan of a [[Table]] - * - * @deprecated [[BatchTableEnvironment]] will be dropped in Flink 1.14 because it only supports - * the old planner. Use the unified [[TableEnvironment]] instead, which supports both - * batch and streaming. More advanced operations previously covered by the DataSet API - * can now use the DataStream API in BATCH execution mode. - */ -@deprecated -trait BatchTableEnvironment extends TableEnvironment { - - /** - * Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param tf The TableFunction to register. - * @tparam T The type of the output row. - */ - def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit - - /** - * Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. - * Registered functions can be referenced in Table API and SQL queries. - * - * @param name The name under which the function is registered. - * @param f The AggregateFunction to register. - * @tparam T The type of the output value. - * @tparam ACC The type of aggregate accumulator. - */ - def registerFunction[T: TypeInformation, ACC: TypeInformation]( - name: String, - f: AggregateFunction[T, ACC]): Unit - - /** - * Converts the given [[DataSet]] into a [[Table]]. - * - * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]]. - * - * @param dataSet The [[DataSet]] to be converted. - * @tparam T The type of the [[DataSet]]. - * @return The converted [[Table]]. - */ - def fromDataSet[T](dataSet: DataSet[T]): Table - - /** - * Converts the given [[DataSet]] into a [[Table]] with specified field names. - * - * There are two modes for mapping original fields to the fields of the [[Table]]: - * - * 1. Reference input fields by name: - * All fields in the schema definition are referenced by name - * (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * val table: Table = tableEnv.fromDataSet( - * set, - * $"_2", // reorder and use the original field - * $"_1" as "name" // reorder and give the original field a better name - * ) - * }}} - * - * 2. Reference input fields by position: - * In this mode, fields are simply renamed. This mode can only be - * used if the input type has a defined field order (tuple, case class, Row) and none of - * the `fields` references a field of the input type. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * val table: Table = tableEnv.fromDataSet( - * set, - * $"a", // renames the first field to 'a' - * $"b" // renames the second field to 'b' - * ) - * }}} - * - * @param dataSet The [[DataSet]] to be converted. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the [[Table]]. - * @tparam T The type of the [[DataSet]]. - * @return The converted [[Table]]. - */ - def fromDataSet[T](dataSet: DataSet[T], fields: Expression*): Table - - /** - * Creates a view from the given [[DataSet]]. - * Registered views can be referenced in SQL queries. - * - * The field names of the [[Table]] are automatically derived from the type of the [[DataSet]]. - * - * The view is registered in the namespace of the current catalog and database. To register the - * view in a different catalog use [[createTemporaryView]]. - * - * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param name The name under which the [[DataSet]] is registered in the catalog. - * @param dataSet The [[DataSet]] to register. - * @tparam T The type of the [[DataSet]] to register. - * @deprecated use [[createTemporaryView]] - */ - @deprecated - def registerDataSet[T](name: String, dataSet: DataSet[T]): Unit - - /** - * Creates a view from the given [[DataSet]] in a given path. - * Registered tables can be referenced in SQL queries. - * - * The field names of the [[Table]] are automatically derived - * from the type of the [[DataSet]]. - * - * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param path The path under which the [[DataSet]] is created. - * See also the [[TableEnvironment]] class description for the format of the path. - * @param dataSet The [[DataSet]] out of which to create the view. - * @tparam T The type of the [[DataSet]]. - */ - def createTemporaryView[T](path: String, dataSet: DataSet[T]): Unit - - /** - * Creates a view from the given [[DataSet]] in a given path with specified field names. - * Registered views can be referenced in SQL queries. - * - * There are two modes for mapping original fields to the fields of the View: - * - * 1. Reference input fields by name: - * All fields in the schema definition are referenced by name - * (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * tableEnv.registerDataSet( - * "myTable", - * set, - * $"_2", // reorder and use the original field - * $"_1" as "name" // reorder and give the original field a better name - * ); - * }}} - * - * 2. Reference input fields by position: - * In this mode, fields are simply renamed. This mode can only be - * used if the input type has a defined field order (tuple, case class, Row) and none of - * the `fields` references a field of the input type. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * tableEnv.registerDataSet( - * "myTable", - * set, - * $"a", // renames the first field to 'a' - * $"b" // renames the second field to 'b' - * ) - * }}} - * - * The view is registered in the namespace of the current catalog and database. To register the - * view in a different catalog use [[createTemporaryView]]. - * - * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param name The name under which the [[DataSet]] is registered in the catalog. - * @param dataSet The [[DataSet]] to register. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the View. - * @tparam T The type of the [[DataSet]] to register. - * @deprecated use [[createTemporaryView]] - */ - @deprecated - def registerDataSet[T](name: String, dataSet: DataSet[T], fields: Expression*): Unit - - /** - * Creates a view from the given [[DataSet]] in a given path with specified field names. - * Registered views can be referenced in SQL queries. - * - * There are two modes for mapping original fields to the fields of the View: - * - * 1. Reference input fields by name: - * All fields in the schema definition are referenced by name - * (and possibly renamed using an alias (as). In this mode, fields can be reordered and - * projected out. This mode can be used for any input type, including POJOs. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * tableEnv.createTemporaryView( - * "cat.db.myTable", - * set, - * $"_2", // reorder and use the original field - * $"_1" as "name" // reorder and give the original field a better name - * ) - * }}} - * - * 2. Reference input fields by position: - * In this mode, fields are simply renamed. This mode can only be - * used if the input type has a defined field order (tuple, case class, Row) and none of - * the `fields` references a field of the input type. - * - * Example: - * - * {{{ - * val set: DataSet[(String, Long)] = ... - * tableEnv.createTemporaryView( - * "cat.db.myTable", - * set, - * $"a", // renames the first field to 'a' - * $"b" // renames the second field to 'b' - * ) - * }}} - * - * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, - * it will be inaccessible in the current session. To make the permanent object available again - * you can drop the corresponding temporary object. - * - * @param path The path under which the [[DataSet]] is created. - * See also the [[TableEnvironment]] class description for the format of the - * path. - * @param dataSet The [[DataSet]] out of which to create the view. - * @param fields The fields expressions to map original fields of the DataSet to the fields of - * the View. - * @tparam T The type of the [[DataSet]]. - */ - def createTemporaryView[T](path: String, dataSet: DataSet[T], fields: Expression*): Unit - - /** - * Converts the given [[Table]] into a [[DataSet]] of a specified type. - * - * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. - * - * @param table The [[Table]] to convert. - * @tparam T The type of the resulting [[DataSet]]. - * @return The converted [[DataSet]]. - */ - def toDataSet[T: TypeInformation](table: Table): DataSet[T] - - /** - * Triggers the program execution. The environment will execute all parts of - * the program. - * - * The program execution will be logged and displayed with the provided name - * - * It calls the ExecutionEnvironment#execute on the underlying - * [[ExecutionEnvironment]]. In contrast to the [[TableEnvironment]] this - * environment translates queries eagerly. - * - * @param jobName Desired name of the job - * @return The result of the job execution, containing elapsed time and accumulators. - * @throws Exception which occurs during job execution. - */ - @throws[Exception] - override def execute(jobName: String): JobExecutionResult - - /** - * Creates a temporary table from a descriptor. - * - * Descriptors allow for declaring the communication to external systems in an - * implementation-agnostic way. The classpath is scanned for suitable table factories that match - * the desired configuration. - * - * The following example shows how to read from a connector using a JSON format and - * registering a temporary table as "MyTable": - * - * {{{ - * - * tableEnv - * .connect( - * new ExternalSystemXYZ() - * .version("0.11")) - * .withFormat( - * new Json() - * .jsonSchema("{...}") - * .failOnMissingField(false)) - * .withSchema( - * new Schema() - * .field("user-name", "VARCHAR").from("u_name") - * .field("count", "DECIMAL") - * .createTemporaryTable("MyTable") - * }}} - * - * @param connectorDescriptor connector descriptor describing the external system - * @deprecated The SQL `CREATE TABLE` DDL is richer than this part of the API. - * This method might be refactored in the next versions. - * Please use [[executeSql]] to register a table instead. - */ - @deprecated - override def connect(connectorDescriptor: ConnectorDescriptor): BatchTableDescriptor -} - -/** - * @deprecated [[BatchTableEnvironment]] will be dropped in Flink 1.14 because it only supports - * the old planner. Use the unified [[TableEnvironment]] instead, which supports both - * batch and streaming. More advanced operations previously covered by the DataSet API - * can now use the DataStream API in BATCH execution mode. - */ -@deprecated -object BatchTableEnvironment { - - /** - * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works - * with [[DataSet]]s. - * - * A TableEnvironment can be used to: - * - convert a [[DataSet]] to a [[Table]] - * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataSet]] - * - explain the AST and execution plan of a [[Table]] - * - * @param executionEnvironment The Scala batch [[ExecutionEnvironment]] of the TableEnvironment. - */ - def create(executionEnvironment: ExecutionEnvironment): BatchTableEnvironment = { - val configuration = new Configuration - configuration.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH) - val config = new TableConfig(); - config.addConfiguration(configuration) - create(executionEnvironment, config) - } - - /** - * The [[TableEnvironment]] for a Scala batch [[ExecutionEnvironment]] that works - * with [[DataSet]]s. - * - * A TableEnvironment can be used to: - * - convert a [[DataSet]] to a [[Table]] - * - register a [[DataSet]] in the [[TableEnvironment]]'s catalog - * - register a [[Table]] in the [[TableEnvironment]]'s catalog - * - scan a registered table to obtain a [[Table]] - * - specify a SQL query on registered tables to obtain a [[Table]] - * - convert a [[Table]] into a [[DataSet]] - * - explain the AST and execution plan of a [[Table]] - * - * @param executionEnvironment The Scala batch [[ExecutionEnvironment]] of the TableEnvironment. - * @param tableConfig The configuration of the TableEnvironment. - */ - def create(executionEnvironment: ExecutionEnvironment, tableConfig: TableConfig) - : BatchTableEnvironment = { - try { - // temporary solution until FLINK-15635 is fixed - val classLoader = Thread.currentThread.getContextClassLoader - - val moduleManager = new ModuleManager - - val defaultCatalog = "default_catalog" - val catalogManager = CatalogManager.newBuilder - .classLoader(classLoader) - .config(tableConfig.getConfiguration) - .defaultCatalog( - defaultCatalog, - new GenericInMemoryCatalog(defaultCatalog, "default_database")) - .executionConfig(executionEnvironment.getConfig) - .build - - val clazz = Class - .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl") - val con = clazz - .getConstructor( - classOf[ExecutionEnvironment], - classOf[TableConfig], - classOf[CatalogManager], - classOf[ModuleManager]) - con.newInstance(executionEnvironment, tableConfig, catalogManager, moduleManager) - .asInstanceOf[BatchTableEnvironment] - } catch { - case t: Throwable => throw new TableException("Create BatchTableEnvironment failed.", t) - } - } -} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala deleted file mode 100644 index c51e367..0000000 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/DataSetConversions.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.api.bridge.scala - -import org.apache.flink.annotation.PublicEvolving -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Table -import org.apache.flink.table.expressions.Expression - -/** - * Holds methods to convert a [[DataSet]] into a [[Table]]. - * - * @param dataSet The [[DataSet]] to convert. - * @param inputType The [[TypeInformation]] for the type of the [[DataSet]]. - * @tparam T The type of the [[DataSet]]. - */ -@PublicEvolving -class DataSetConversions[T](dataSet: DataSet[T], inputType: TypeInformation[T]) { - - /** - * Converts the [[DataSet]] into a [[Table]]. - * - * The field names of the new [[Table]] can be specified like this: - * - * {{{ - * val env = ExecutionEnvironment.getExecutionEnvironment - * val tEnv = BatchTableEnvironment.create(env) - * - * val set: DataSet[(String, Int)] = ... - * val table = set.toTable(tEnv, 'name, 'amount) - * }}} - * - * If not explicitly specified, field names are automatically extracted from the type of - * the [[DataSet]]. - * - * @param tableEnv The [[BatchTableEnvironment]] in which the new [[Table]] is created. - * @param fields The field names of the new [[Table]] (optional). - * @return The resulting [[Table]]. - */ - def toTable(tableEnv: BatchTableEnvironment, fields: Expression*): Table = { - if (fields.isEmpty) { - tableEnv.fromDataSet(dataSet) - } else { - tableEnv.fromDataSet(dataSet, fields: _*) - } - } - -} - diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala index 592b363..f8ee450 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/TableConversions.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.api.internal.TableImpl import org.apache.flink.table.api.{Table, TableException, ValidationException} /** - * Holds methods to convert a [[Table]] into a [[DataSet]] or a [[DataStream]]. + * Holds methods to convert a [[Table]] into a [[DataStream]]. * * @param table The table to convert. */ @@ -36,28 +36,6 @@ class TableConversions(table: Table) { private val internalTable = table.asInstanceOf[TableImpl] /** - * Converts the given [[Table]] into a [[DataSet]] of a specified type. - * - * The fields of the [[Table]] are mapped to [[DataSet]] fields as follows: - * - [[org.apache.flink.types.Row]] and [[org.apache.flink.api.java.tuple.Tuple]] - * types: Fields are mapped by position, field types must match. - * - POJO [[DataSet]] types: Fields are mapped by field name, field types must match. - * - * @tparam T The type of the resulting [[DataSet]]. - * @return The converted [[DataSet]]. - */ - def toDataSet[T: TypeInformation]: DataSet[T] = { - - internalTable.getTableEnvironment match { - case tEnv: BatchTableEnvironment => - tEnv.toDataSet(table) - case _ => - throw new ValidationException( - "Only tables that originate from Scala DataSets can be converted to Scala DataSets.") - } - } - - /** * Converts the given [[Table]] into an append [[DataStream]] of a specified type. * * The [[Table]] must only have insert (append) changes. If the [[Table]] is also modified diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala index 6e39054..ec2c6da 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/package.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.api.bridge -import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api.internal.TableImpl import org.apache.flink.table.api.{ImplicitExpressionConversions, ImplicitExpressionOperations, Table, ValidationException} @@ -56,23 +56,10 @@ package object scala { new TableConversions(table.asInstanceOf[TableImpl]) } - implicit def dataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { - new DataSetConversions[T](set, set.getType()) - } - implicit def dataStreamConversions[T](set: DataStream[T]): DataStreamConversions[T] = { new DataStreamConversions[T](set, set.dataType) } - implicit def table2RowDataSet(table: Table): DataSet[Row] = { - val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment - if (!tableEnv.isInstanceOf[BatchTableEnvironment]) { - throw new ValidationException("Table cannot be converted into a DataSet. " + - "It is not part of a batch table environment.") - } - tableEnv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table) - } - implicit def table2RowDataStream(table: Table): DataStream[Row] = { val tableEnv = table.asInstanceOf[TableImpl].getTableEnvironment if (!tableEnv.isInstanceOf[StreamTableEnvironment]) {