This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit eb5f094e1805f0a689e29deb35f242c79abc8676 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jun 7 11:15:16 2021 +0200 [FLINK-22877][table] Remove BatchTableSink and related classes --- .../legacyutils/TestCollectionTableFactory.scala | 19 ++------ .../table/factories/BatchTableSinkFactory.java | 54 ---------------------- .../apache/flink/table/sinks/BatchTableSink.java | 40 ---------------- .../table/sinks/CsvBatchTableSinkFactory.java | 6 +-- .../org/apache/flink/table/sinks/CsvTableSink.java | 25 +--------- .../java/org/apache/flink/table/api/Table.java | 38 +++++---------- .../utils/TestCollectionTableFactory.scala | 12 ++--- .../table/planner/utils/testTableSourceSinks.scala | 17 ++----- 8 files changed, 28 insertions(+), 183 deletions(-) diff --git a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala index 3fd1f51..0d8e03a 100644 --- a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala +++ b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala @@ -21,10 +21,8 @@ package org.apache.flink.table.legacyutils import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat} -import org.apache.flink.api.java.operators.DataSink +import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -32,10 +30,10 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.table.api.TableSchema import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR import org.apache.flink.table.descriptors.{DescriptorProperties, Schema} -import org.apache.flink.table.factories.{BatchTableSinkFactory, StreamTableSinkFactory, StreamTableSourceFactory} +import org.apache.flink.table.factories.{StreamTableSinkFactory, StreamTableSourceFactory} import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction} import org.apache.flink.table.legacyutils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource} -import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink} import org.apache.flink.table.sources.{LookupableTableSource, StreamTableSource, TableSource} import org.apache.flink.types.Row @@ -52,7 +50,6 @@ import scala.collection.JavaConversions._ class TestCollectionTableFactory extends StreamTableSourceFactory[Row] with StreamTableSinkFactory[Row] - with BatchTableSinkFactory[Row] { override def createTableSource(properties: JMap[String, String]): TableSource[Row] = { @@ -71,10 +68,6 @@ class TestCollectionTableFactory getCollectionSink(properties) } - override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = { - getCollectionSink(properties) - } - override def requiredContext(): JMap[String, String] = { val context = new util.HashMap[String, String]() context.put(CONNECTOR, "COLLECTION") @@ -173,11 +166,7 @@ object TestCollectionTableFactory { * Table sink of collection. */ class CollectionTableSink(val outputType: RowTypeInfo) - extends BatchTableSink[Row] - with AppendStreamTableSink[Row] { - override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = { - dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1) - } + extends AppendStreamTableSink[Row] { override def getOutputType: RowTypeInfo = outputType diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java deleted file mode 100644 index f1737ee..0000000 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.factories; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.sinks.BatchTableSink; -import org.apache.flink.table.sinks.TableSink; - -import java.util.Map; - -/** - * A factory to create configured table sink instances in a batch environment based on string-based - * properties. See also {@link TableSinkFactory} for more information. - * - * @param <T> type of records that the factory consumes - * @deprecated This interface has been replaced by {@link DynamicTableSinkFactory}. The new - * interface creates instances of {@link DynamicTableSink} and only works with the Blink - * planner. See FLIP-95 for more information. - */ -@Deprecated -@PublicEvolving -public interface BatchTableSinkFactory<T> extends TableSinkFactory<T> { - - /** - * Creates and configures a {@link BatchTableSink} using the given properties. - * - * @param properties normalized properties describing a table sink. - * @return the configured table sink. - */ - BatchTableSink<T> createBatchTableSink(Map<String, String> properties); - - /** Only create batch table sink. */ - @Override - default TableSink<T> createTableSink(Map<String, String> properties) { - return createBatchTableSink(properties); - } -} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java deleted file mode 100644 index 1613e33..0000000 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DataSink; -import org.apache.flink.table.api.Table; - -/** - * Defines an external {@link TableSink} to emit a batch {@link Table}. - * - * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports. - * @deprecated use {@link OutputFormatTableSink} instead. - */ -@Deprecated -public interface BatchTableSink<T> extends TableSink<T> { - - /** - * Consumes the DataSet and return the {@link DataSink}. The returned {@link DataSink} will be - * used to generate {@link Plan}. - */ - DataSink<?> consumeDataSet(DataSet<T> dataSet); -} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java index e7cb8e0..277921e 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java @@ -19,7 +19,7 @@ package org.apache.flink.table.sinks; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.factories.BatchTableSinkFactory; +import org.apache.flink.table.factories.StreamTableSinkFactory; import org.apache.flink.types.Row; import java.util.Map; @@ -29,10 +29,10 @@ import java.util.Map; */ @PublicEvolving public class CsvBatchTableSinkFactory extends CsvTableSinkFactoryBase - implements BatchTableSinkFactory<Row> { + implements StreamTableSinkFactory<Row> { @Override - public BatchTableSink<Row> createBatchTableSink(Map<String, String> properties) { + public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) { return createTableSink(false, properties); } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java index 304a62d..451a2ab 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java @@ -20,9 +20,6 @@ package org.apache.flink.table.sinks; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DataSink; -import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -36,7 +33,7 @@ import org.apache.flink.types.Row; import java.util.Arrays; /** A simple {@link TableSink} to emit data as CSV files. */ -public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink<Row> { +public class CsvTableSink implements AppendStreamTableSink<Row> { private String path; private String fieldDelim; private int numFiles = -1; @@ -108,26 +105,6 @@ public class CsvTableSink implements BatchTableSink<Row>, AppendStreamTableSink< } @Override - public DataSink<?> consumeDataSet(DataSet<Row> dataSet) { - MapOperator<Row, String> csvRows = - dataSet.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); - - DataSink<String> sink; - if (writeMode != null) { - sink = csvRows.writeAsText(path, writeMode); - } else { - sink = csvRows.writeAsText(path); - } - - if (numFiles > 0) { - csvRows.setParallelism(numFiles); - sink.setParallelism(numFiles); - } - - return sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, fieldNames)); - } - - @Override public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) { SingleOutputStreamOperator<String> csvRows = dataStream.map(new CsvFormatter(fieldDelim == null ? "," : fieldDelim)); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index a9b2795..150d65f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -20,6 +20,7 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.TemporalTableFunction; @@ -923,14 +924,9 @@ public interface Table { } /** - * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path. - * For the path resolution algorithm see {@link TableEnvironment#useDatabase(String)}. - * - * <p>A batch {@link Table} can only be written to a {@code - * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code - * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code - * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code - * org.apache.flink.table.sinks.UpsertStreamTableSink}. + * Writes the {@link Table} to a {@link DynamicTableSink} that was registered under the + * specified path. For the path resolution algorithm see {@link + * TableEnvironment#useDatabase(String)}. * * @param tablePath The path of the registered {@link TableSink} to which the {@link Table} is * written. @@ -1299,18 +1295,13 @@ public interface Table { FlatAggregateTable flatAggregate(Expression tableAggregateFunction); /** - * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, - * and then execute the insert operation. + * Declares that the pipeline defined by the given {@link Table} object should be written to a + * table (backed by a {@link DynamicTableSink}) that was registered under the specified path. It + * executes the insert operation. * * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or {@link * TableEnvironment#useCatalog(String)} for the rules on the path resolution. * - * <p>A batch {@link Table} can only be written to a {@code - * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code - * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code - * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code - * org.apache.flink.table.sinks.UpsertStreamTableSink}. - * * <p>Example: * * <pre>{@code @@ -1319,24 +1310,19 @@ public interface Table { * tableResult... * }</pre> * - * @param tablePath The path of the registered TableSink to which the Table is written. + * @param tablePath The path of the registered table (backed by a {@link DynamicTableSink}). * @return The insert operation execution result. */ TableResult executeInsert(String tablePath); /** - * Writes the {@link Table} to a {@link TableSink} that was registered under the specified path, - * and then execute the insert operation. + * Declares that the pipeline defined by the given {@link Table} object should be written to a + * table (backed by a {@link DynamicTableSink}) that was registered under the specified path. It + * executes the insert operation. * * <p>See the documentation of {@link TableEnvironment#useDatabase(String)} or {@link * TableEnvironment#useCatalog(String)} for the rules on the path resolution. * - * <p>A batch {@link Table} can only be written to a {@code - * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} requires a {@code - * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code - * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code - * org.apache.flink.table.sinks.UpsertStreamTableSink}. - * * <p>Example: * * <pre>{@code @@ -1345,7 +1331,7 @@ public interface Table { * tableResult... * }</pre> * - * @param tablePath The path of the registered TableSink to which the Table is written. + * @param tablePath The path of the registered table (backed by a {@link DynamicTableSink}). * @param overwrite The flag that indicates whether the insert should overwrite existing data or * not. * @return The insert operation execution result. diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala index 35c6c4c..22fd537 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala @@ -21,9 +21,7 @@ package org.apache.flink.table.planner.factories.utils import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat} -import org.apache.flink.api.java.operators.DataSink -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -34,7 +32,7 @@ import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory} import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction} import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource} import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo -import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, TableSink} import org.apache.flink.table.sources.{LookupableTableSource, StreamTableSource} import org.apache.flink.table.types.DataType import org.apache.flink.table.utils.TableSchemaUtils.getPhysicalSchema @@ -152,11 +150,7 @@ object TestCollectionTableFactory { * Table sink of collection. */ class CollectionTableSink(val schema: TableSchema) - extends BatchTableSink[Row] - with AppendStreamTableSink[Row] { - override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = { - dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1) - } + extends AppendStreamTableSink[Row] { override def getConsumedDataType: DataType = schema.toRowDataType diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index 13edf80..46fe517 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -23,16 +23,14 @@ import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.io.{CollectionInputFormat, RowCsvInputFormat} -import org.apache.flink.api.java.operators.DataSink import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.core.io.InputSplit import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema} import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, CatalogTableImpl, ObjectPath} import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR, CONNECTOR_TYPE} -import org.apache.flink.table.descriptors.{CustomConnectorDescriptor, DescriptorProperties, FileSystem, OldCsv, Schema, SchemaValidator} +import org.apache.flink.table.descriptors._ import org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall import org.apache.flink.table.expressions.{CallExpression, Expression, FieldReferenceExpression, ValueLiteralExpression} import org.apache.flink.table.factories.{StreamTableSourceFactory, TableSinkFactory, TableSourceFactory} @@ -41,9 +39,9 @@ import org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND import org.apache.flink.table.planner.plan.hint.OptionsHintTest.IS_BOUNDED import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction -import org.apache.flink.table.planner.{JArrayList, JInt, JList, JLong, JMap} +import org.apache.flink.table.planner._ import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo -import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink, TableSink} +import org.apache.flink.table.sinks.{StreamTableSink, TableSink} import org.apache.flink.table.sources._ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks} @@ -1309,7 +1307,7 @@ class TestOptionsTableFactory createPropertiesSource(context.getTable.toProperties) } - override def createTableSink(context: TableSinkFactory.Context): BatchTableSink[Row] = { + override def createTableSink(context: TableSinkFactory.Context): TableSink[Row] = { createPropertiesSink(context.getTable.toProperties) } } @@ -1338,8 +1336,7 @@ class OptionsTableSource( class OptionsTableSink( tableSchema: TableSchema, val props: JMap[String, String]) - extends BatchTableSink[Row] - with StreamTableSink[Row] { + extends StreamTableSink[Row] { override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { None.asInstanceOf[DataStreamSink[Row]] @@ -1353,10 +1350,6 @@ class OptionsTableSink( override def getConsumedDataType: DataType = { getPhysicalSchema(tableSchema).toRowDataType } - - override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = { - None.asInstanceOf[DataSink[Row]] - } } object TestOptionsTableFactory {