wuchong commented on a change in pull request #12199:
URL: https://github.com/apache/flink/pull/12199#discussion_r428432638



##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala
##########
@@ -104,14 +104,35 @@ class TableITCase(tableEnvName: String, isStreaming: 
Boolean) extends TestLogger
 
   @Test
   def testExecuteWithUpdateChanges(): Unit = {

Review comment:
       Is this the same with 
`TableEnvironmentITCase#testExecuteSelectWithUpdateChanges`?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
##########
@@ -186,6 +186,21 @@ abstract class PlannerBase(
           "UnregisteredSink",
           ConnectorCatalogTable.sink(s.getSink, !isStreamingMode))
 
+      case s: SelectSinkOperation =>
+        val input = getRelBuilder.queryOperation(s.getChild).build()
+        // convert query schema to sink schema
+        val sinkSchema = 
SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
+          
SelectTableSinkSchemaConverter.changeDefaultConversionClass(s.getChild.getTableSchema))

Review comment:
       Why do we need to use default conversion class here?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java
##########
@@ -28,61 +28,106 @@
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
 import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.SelectTableSink;
-import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
+import org.apache.flink.table.api.internal.SelectResultProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.Row;
 
 import java.util.Iterator;
 import java.util.UUID;
+import java.util.stream.Stream;
 
 /**
- * Basic implementation of {@link SelectTableSink}.
+ * Basic implementation of {@link StreamTableSink} for select job to collect 
the result to local.
  */
-public class SelectTableSinkBase implements SelectTableSink {
+public abstract class SelectTableSinkBase<T> implements StreamTableSink<T> {
 
        private final TableSchema tableSchema;
-       private final CollectSinkOperatorFactory<Row> factory;
-       private final CollectResultIterator<Row> iterator;
+       protected final DataFormatConverters.DataFormatConverter<RowData, Row> 
converter;
+
+       private final CollectSinkOperatorFactory<T> factory;
+       private final CollectResultIterator<T> iterator;
 
        @SuppressWarnings("unchecked")
-       public SelectTableSinkBase(TableSchema tableSchema) {
-               this.tableSchema = 
SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(
-                       
SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema));
+       public SelectTableSinkBase(TableSchema schema, TypeSerializer<T> 
typeSerializer) {
+               this.tableSchema = schema;
+               this.converter = 
DataFormatConverters.getConverterForDataType(this.tableSchema.toPhysicalRowDataType());

Review comment:
       A better way is use `DataStructureConverters.getConverter` which covers 
full data types (but requires to call open() first in runtime). 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java
##########
@@ -197,13 +198,13 @@ public TableResult build() {
         */
        public interface PrintStyle {
                /**
-                * Create a tableau print style with given max column width and 
null column,
+                * Create a tableau print style with given max column width, 
null column and change mode indicator,
                 * which prints the result schema and content as tableau form.
                 */
-               static PrintStyle tableau(int maxColumnWidth, String 
nullColumn) {
+               static PrintStyle tableau(int maxColumnWidth, String 
nullColumn, boolean printChangeMode) {

Review comment:
       Could you change all the `printChangeMode` to `printRowKind`? It will be 
more align with our current concept. Change mode sounds more like 
`ChangelogMode`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to