TsReaper commented on a change in pull request #12199:
URL: https://github.com/apache/flink/pull/12199#discussion_r427093164



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java
##########
@@ -28,61 +28,102 @@
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
 import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectTableSink;
-import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.Row;
 
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 /**
- * Basic implementation of {@link SelectTableSink}.
+ * Basic implementation of {@link StreamTableSink} for select job to collect 
the result to local.
  */
-public class SelectTableSinkBase implements SelectTableSink {
+public abstract class SelectTableSinkBase<T> implements StreamTableSink<T> {
 
        private final TableSchema tableSchema;
-       private final CollectSinkOperatorFactory<Row> factory;
-       private final CollectResultIterator<Row> iterator;
+       protected final DataFormatConverters.DataFormatConverter<RowData, Row> 
converter;
+
+       private final CollectSinkOperatorFactory<T> factory;
+       private final CollectResultIterator<T> iterator;
 
        @SuppressWarnings("unchecked")
-       public SelectTableSinkBase(TableSchema tableSchema) {
+       public SelectTableSinkBase(TableSchema schema, TypeSerializer<T> 
typeSerializer) {
                this.tableSchema = 
SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
-                       
SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema));
+                               
SelectTableSinkSchemaConverter.changeDefaultConversionClass(schema));
+               this.converter = 
DataFormatConverters.getConverterForDataType(this.tableSchema.toPhysicalRowDataType());
 
-               TypeSerializer<Row> typeSerializer = (TypeSerializer<Row>) 
TypeInfoDataTypeConverter
-                       
.fromDataTypeToTypeInfo(this.tableSchema.toRowDataType())
-                       .createSerializer(new ExecutionConfig());
                String accumulatorName = "tableResultCollect_" + 
UUID.randomUUID();
-
                this.factory = new CollectSinkOperatorFactory<>(typeSerializer, 
accumulatorName);
                CollectSinkOperator<Row> operator = (CollectSinkOperator<Row>) 
factory.getOperator();
                this.iterator = new 
CollectResultIterator<>(operator.getOperatorIdFuture(), typeSerializer, 
accumulatorName);
        }
 
        @Override
-       public DataType getConsumedDataType() {
-               return tableSchema.toRowDataType();
+       public TableSchema getTableSchema() {
+               return tableSchema;
        }
 
        @Override
-       public TableSchema getTableSchema() {
-               return tableSchema;
+       public TableSink<T> configure(String[] fieldNames, TypeInformation<?>[] 
fieldTypes) {
+               throw new UnsupportedOperationException();
        }
 
-       protected DataStreamSink<?> consumeDataStream(DataStream<Row> 
dataStream) {
-               CollectStreamSink<Row> sink = new 
CollectStreamSink<>(dataStream, factory);
+       @Override
+       public DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) {
+               CollectStreamSink<?> sink = new CollectStreamSink<>(dataStream, 
factory);
                
dataStream.getExecutionEnvironment().addOperator(sink.getTransformation());
                return sink.name("Select table sink");
        }
 
-       @Override
-       public void setJobClient(JobClient jobClient) {
-               iterator.setJobClient(jobClient);
+       public SelectResultProvider getSelectResultProvider() {
+               return new SelectResultProvider() {
+                       @Override
+                       public void setJobClient(JobClient jobClient) {
+                               iterator.setJobClient(jobClient);
+                       }
+
+                       @Override
+                       public Iterator<Row> getResultIterator() {
+                               return new RowIteratorWrapper(iterator);
+                       }
+               };
        }
 
-       @Override
-       public Iterator<Row> getResultIterator() {
-               return iterator;
+       /**
+        * An Iterator wrapper class that converts Iterator&lt;T&gt; to 
Iterator&lt;Row&gt;.
+        */
+       private class RowIteratorWrapper implements Iterator<Row> {

Review comment:
       Should also implement `AutoClosable`, otherwise job related resources 
will not be disposed if user reads only a partial of results.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java
##########
@@ -69,19 +72,54 @@ public TableSchema getTableSchema() {
        }
 
        @Override
-       public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
-               CollectStreamSink<Row> sink = new 
CollectStreamSink<>(dataStream, factory);
+       public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
+               CollectStreamSink<?> sink = new CollectStreamSink<>(dataStream, 
factory);
                
dataStream.getExecutionEnvironment().addOperator(sink.getTransformation());
                return sink.name("Streaming select table sink");
        }
 
-       @Override
-       public void setJobClient(JobClient jobClient) {
-               iterator.setJobClient(jobClient);
+       public SelectResultProvider getSelectResultProvider() {
+               return new SelectResultProvider() {
+
+                       @Override
+                       public void setJobClient(JobClient jobClient) {
+                               iterator.setJobClient(jobClient);
+                       }
+
+                       @Override
+                       public Iterator<Row> getResultIterator() {
+                               return new RowIteratorWrapper(iterator);
+                       }
+               };
        }
 
-       @Override
-       public Iterator<Row> getResultIterator() {
-               return iterator;
+       /**
+        * An Iterator wrapper class that converts 
Iterator&lt;Tuple2&lt;Boolean, Row&gt;&gt; to Iterator&lt;Row&gt;.
+        */
+       private static class RowIteratorWrapper implements Iterator<Row> {

Review comment:
       ditto.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to