This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eb5f094e1805f0a689e29deb35f242c79abc8676
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jun 7 11:15:16 2021 +0200

    [FLINK-22877][table] Remove BatchTableSink and related classes
---
 .../legacyutils/TestCollectionTableFactory.scala   | 19 ++------
 .../table/factories/BatchTableSinkFactory.java     | 54 ----------------------
 .../apache/flink/table/sinks/BatchTableSink.java   | 40 ----------------
 .../table/sinks/CsvBatchTableSinkFactory.java      |  6 +--
 .../org/apache/flink/table/sinks/CsvTableSink.java | 25 +---------
 .../java/org/apache/flink/table/api/Table.java     | 38 +++++----------
 .../utils/TestCollectionTableFactory.scala         | 12 ++---
 .../table/planner/utils/testTableSourceSinks.scala | 17 ++-----
 8 files changed, 28 insertions(+), 183 deletions(-)

diff --git 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
index 3fd1f51..0d8e03a 100644
--- 
a/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
+++ 
b/flink-python/src/test/scala/org/apache/flink/table/legacyutils/TestCollectionTableFactory.scala
@@ -21,10 +21,8 @@ package org.apache.flink.table.legacyutils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.io.{CollectionInputFormat, 
LocalCollectionOutputFormat}
-import org.apache.flink.api.java.operators.DataSink
+import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, 
DataStreamSource}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -32,10 +30,10 @@ import 
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.TableSchema
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
 import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{BatchTableSinkFactory, 
StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.factories.{StreamTableSinkFactory, 
StreamTableSourceFactory}
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
 import 
org.apache.flink.table.legacyutils.TestCollectionTableFactory.{getCollectionSink,
 getCollectionSource}
-import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, 
StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, 
TableSink}
 import org.apache.flink.table.sources.{LookupableTableSource, 
StreamTableSource, TableSource}
 import org.apache.flink.types.Row
 
@@ -52,7 +50,6 @@ import scala.collection.JavaConversions._
 class TestCollectionTableFactory
   extends StreamTableSourceFactory[Row]
   with StreamTableSinkFactory[Row]
-  with BatchTableSinkFactory[Row]
 {
 
   override def createTableSource(properties: JMap[String, String]): 
TableSource[Row] = {
@@ -71,10 +68,6 @@ class TestCollectionTableFactory
     getCollectionSink(properties)
   }
 
-  override def createBatchTableSink(properties: JMap[String, String]): 
BatchTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
   override def requiredContext(): JMap[String, String] = {
     val context = new util.HashMap[String, String]()
     context.put(CONNECTOR, "COLLECTION")
@@ -173,11 +166,7 @@ object TestCollectionTableFactory {
     * Table sink of collection.
     */
   class CollectionTableSink(val outputType: RowTypeInfo)
-      extends BatchTableSink[Row]
-      with AppendStreamTableSink[Row] {
-    override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
-      dataSet.output(new 
LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
-    }
+      extends AppendStreamTableSink[Row] {
 
     override def getOutputType: RowTypeInfo = outputType
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
deleted file mode 100644
index f1737ee..0000000
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/BatchTableSinkFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.factories;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.sinks.BatchTableSink;
-import org.apache.flink.table.sinks.TableSink;
-
-import java.util.Map;
-
-/**
- * A factory to create configured table sink instances in a batch environment 
based on string-based
- * properties. See also {@link TableSinkFactory} for more information.
- *
- * @param <T> type of records that the factory consumes
- * @deprecated This interface has been replaced by {@link 
DynamicTableSinkFactory}. The new
- *     interface creates instances of {@link DynamicTableSink} and only works 
with the Blink
- *     planner. See FLIP-95 for more information.
- */
-@Deprecated
-@PublicEvolving
-public interface BatchTableSinkFactory<T> extends TableSinkFactory<T> {
-
-    /**
-     * Creates and configures a {@link BatchTableSink} using the given 
properties.
-     *
-     * @param properties normalized properties describing a table sink.
-     * @return the configured table sink.
-     */
-    BatchTableSink<T> createBatchTableSink(Map<String, String> properties);
-
-    /** Only create batch table sink. */
-    @Override
-    default TableSink<T> createTableSink(Map<String, String> properties) {
-        return createBatchTableSink(properties);
-    }
-}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
deleted file mode 100644
index 1613e33..0000000
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-import org.apache.flink.table.api.Table;
-
-/**
- * Defines an external {@link TableSink} to emit a batch {@link Table}.
- *
- * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and 
supports.
- * @deprecated use {@link OutputFormatTableSink} instead.
- */
-@Deprecated
-public interface BatchTableSink<T> extends TableSink<T> {
-
-    /**
-     * Consumes the DataSet and return the {@link DataSink}. The returned 
{@link DataSink} will be
-     * used to generate {@link Plan}.
-     */
-    DataSink<?> consumeDataSet(DataSet<T> dataSet);
-}
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
index e7cb8e0..277921e 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvBatchTableSinkFactory.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.sinks;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.factories.BatchTableSinkFactory;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
 import org.apache.flink.types.Row;
 
 import java.util.Map;
@@ -29,10 +29,10 @@ import java.util.Map;
  */
 @PublicEvolving
 public class CsvBatchTableSinkFactory extends CsvTableSinkFactoryBase
-        implements BatchTableSinkFactory<Row> {
+        implements StreamTableSinkFactory<Row> {
 
     @Override
-    public BatchTableSink<Row> createBatchTableSink(Map<String, String> 
properties) {
+    public StreamTableSink<Row> createStreamTableSink(Map<String, String> 
properties) {
         return createTableSink(false, properties);
     }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
index 304a62d..451a2ab 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/CsvTableSink.java
@@ -20,9 +20,6 @@ package org.apache.flink.table.sinks;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-import org.apache.flink.api.java.operators.MapOperator;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -36,7 +33,7 @@ import org.apache.flink.types.Row;
 import java.util.Arrays;
 
 /** A simple {@link TableSink} to emit data as CSV files. */
-public class CsvTableSink implements BatchTableSink<Row>, 
AppendStreamTableSink<Row> {
+public class CsvTableSink implements AppendStreamTableSink<Row> {
     private String path;
     private String fieldDelim;
     private int numFiles = -1;
@@ -108,26 +105,6 @@ public class CsvTableSink implements BatchTableSink<Row>, 
AppendStreamTableSink<
     }
 
     @Override
-    public DataSink<?> consumeDataSet(DataSet<Row> dataSet) {
-        MapOperator<Row, String> csvRows =
-                dataSet.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
-
-        DataSink<String> sink;
-        if (writeMode != null) {
-            sink = csvRows.writeAsText(path, writeMode);
-        } else {
-            sink = csvRows.writeAsText(path);
-        }
-
-        if (numFiles > 0) {
-            csvRows.setParallelism(numFiles);
-            sink.setParallelism(numFiles);
-        }
-
-        return 
sink.name(TableConnectorUtils.generateRuntimeName(CsvTableSink.class, 
fieldNames));
-    }
-
-    @Override
     public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
         SingleOutputStreamOperator<String> csvRows =
                 dataStream.map(new CsvFormatter(fieldDelim == null ? "," : 
fieldDelim));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index a9b2795..150d65f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.TemporalTableFunction;
@@ -923,14 +924,9 @@ public interface Table {
     }
 
     /**
-     * Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified path.
-     * For the path resolution algorithm see {@link 
TableEnvironment#useDatabase(String)}.
-     *
-     * <p>A batch {@link Table} can only be written to a {@code
-     * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} 
requires a {@code
-     * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code
-     * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code
-     * org.apache.flink.table.sinks.UpsertStreamTableSink}.
+     * Writes the {@link Table} to a {@link DynamicTableSink} that was 
registered under the
+     * specified path. For the path resolution algorithm see {@link
+     * TableEnvironment#useDatabase(String)}.
      *
      * @param tablePath The path of the registered {@link TableSink} to which 
the {@link Table} is
      *     written.
@@ -1299,18 +1295,13 @@ public interface Table {
     FlatAggregateTable flatAggregate(Expression tableAggregateFunction);
 
     /**
-     * Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified path,
-     * and then execute the insert operation.
+     * Declares that the pipeline defined by the given {@link Table} object 
should be written to a
+     * table (backed by a {@link DynamicTableSink}) that was registered under 
the specified path. It
+     * executes the insert operation.
      *
      * <p>See the documentation of {@link 
TableEnvironment#useDatabase(String)} or {@link
      * TableEnvironment#useCatalog(String)} for the rules on the path 
resolution.
      *
-     * <p>A batch {@link Table} can only be written to a {@code
-     * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} 
requires a {@code
-     * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code
-     * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code
-     * org.apache.flink.table.sinks.UpsertStreamTableSink}.
-     *
      * <p>Example:
      *
      * <pre>{@code
@@ -1319,24 +1310,19 @@ public interface Table {
      * tableResult...
      * }</pre>
      *
-     * @param tablePath The path of the registered TableSink to which the 
Table is written.
+     * @param tablePath The path of the registered table (backed by a {@link 
DynamicTableSink}).
      * @return The insert operation execution result.
      */
     TableResult executeInsert(String tablePath);
 
     /**
-     * Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified path,
-     * and then execute the insert operation.
+     * Declares that the pipeline defined by the given {@link Table} object 
should be written to a
+     * table (backed by a {@link DynamicTableSink}) that was registered under 
the specified path. It
+     * executes the insert operation.
      *
      * <p>See the documentation of {@link 
TableEnvironment#useDatabase(String)} or {@link
      * TableEnvironment#useCatalog(String)} for the rules on the path 
resolution.
      *
-     * <p>A batch {@link Table} can only be written to a {@code
-     * org.apache.flink.table.sinks.BatchTableSink}, a streaming {@link Table} 
requires a {@code
-     * org.apache.flink.table.sinks.AppendStreamTableSink}, a {@code
-     * org.apache.flink.table.sinks.RetractStreamTableSink}, or an {@code
-     * org.apache.flink.table.sinks.UpsertStreamTableSink}.
-     *
      * <p>Example:
      *
      * <pre>{@code
@@ -1345,7 +1331,7 @@ public interface Table {
      * tableResult...
      * }</pre>
      *
-     * @param tablePath The path of the registered TableSink to which the 
Table is written.
+     * @param tablePath The path of the registered table (backed by a {@link 
DynamicTableSink}).
      * @param overwrite The flag that indicates whether the insert should 
overwrite existing data or
      *     not.
      * @return The insert operation execution result.
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
index 35c6c4c..22fd537 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
@@ -21,9 +21,7 @@ package org.apache.flink.table.planner.factories.utils
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.java.io.{CollectionInputFormat, 
LocalCollectionOutputFormat}
-import org.apache.flink.api.java.operators.DataSink
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.java.io.CollectionInputFormat
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, 
DataStreamSource}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -34,7 +32,7 @@ import org.apache.flink.table.factories.{TableSinkFactory, 
TableSourceFactory}
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink,
 getCollectionSource}
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, 
StreamTableSink, TableSink}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, StreamTableSink, 
TableSink}
 import org.apache.flink.table.sources.{LookupableTableSource, 
StreamTableSource}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.utils.TableSchemaUtils.getPhysicalSchema
@@ -152,11 +150,7 @@ object TestCollectionTableFactory {
     * Table sink of collection.
     */
   class CollectionTableSink(val schema: TableSchema)
-    extends BatchTableSink[Row]
-      with AppendStreamTableSink[Row] {
-    override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
-      dataSet.output(new 
LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
-    }
+      extends AppendStreamTableSink[Row] {
 
     override def getConsumedDataType: DataType = schema.toRowDataType
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
index 13edf80..46fe517 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala
@@ -23,16 +23,14 @@ import org.apache.flink.api.common.io.InputFormat
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.{CollectionInputFormat, RowCsvInputFormat}
-import org.apache.flink.api.java.operators.DataSink
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.core.io.InputSplit
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{DataTypes, TableEnvironment, TableSchema}
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, 
CatalogPartitionSpec, CatalogTableImpl, ObjectPath}
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR, 
CONNECTOR_TYPE}
-import org.apache.flink.table.descriptors.{CustomConnectorDescriptor, 
DescriptorProperties, FileSystem, OldCsv, Schema, SchemaValidator}
+import org.apache.flink.table.descriptors._
 import org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall
 import org.apache.flink.table.expressions.{CallExpression, Expression, 
FieldReferenceExpression, ValueLiteralExpression}
 import org.apache.flink.table.factories.{StreamTableSourceFactory, 
TableSinkFactory, TableSourceFactory}
@@ -41,9 +39,9 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.AND
 import org.apache.flink.table.planner.plan.hint.OptionsHintTest.IS_BOUNDED
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import 
org.apache.flink.table.planner.runtime.utils.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.table.planner.{JArrayList, JInt, JList, JLong, JMap}
+import org.apache.flink.table.planner._
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
-import org.apache.flink.table.sinks.{BatchTableSink, StreamTableSink, 
TableSink}
+import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
PreserveWatermarks}
@@ -1309,7 +1307,7 @@ class TestOptionsTableFactory
     createPropertiesSource(context.getTable.toProperties)
   }
 
-  override def createTableSink(context: TableSinkFactory.Context): 
BatchTableSink[Row] = {
+  override def createTableSink(context: TableSinkFactory.Context): 
TableSink[Row] = {
     createPropertiesSink(context.getTable.toProperties)
   }
 }
@@ -1338,8 +1336,7 @@ class OptionsTableSource(
 class OptionsTableSink(
     tableSchema: TableSchema,
     val props: JMap[String, String])
-  extends BatchTableSink[Row]
-    with StreamTableSink[Row] {
+  extends StreamTableSink[Row] {
 
   override def consumeDataStream(dataStream: DataStream[Row]): 
DataStreamSink[_] = {
     None.asInstanceOf[DataStreamSink[Row]]
@@ -1353,10 +1350,6 @@ class OptionsTableSink(
   override def getConsumedDataType: DataType = {
     getPhysicalSchema(tableSchema).toRowDataType
   }
-
-  override def consumeDataSet(dataSet: DataSet[Row]): DataSink[_] = {
-    None.asInstanceOf[DataSink[Row]]
-  }
 }
 
 object TestOptionsTableFactory {

Reply via email to