twalthr commented on code in PR #23884:
URL: https://github.com/apache/flink/pull/23884#discussion_r1420282783


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DistinctQueryOperation.java:
##########
@@ -45,6 +45,13 @@ public String asSummaryString() {
                 "Distinct", Collections.emptyMap(), getChildren(), 
Operation::asSummaryString);
     }
 
+    @Override
+    public String asSerializableString() {
+        return String.format(
+                "SELECT DISTINCT * FROM (%s\n)",

Review Comment:
   Can you double check whether we need to expand all stars in operations? 
Otherwise this could interfere with virtual metadata columns given the 
expansion behavior is configured. Check 
`ColumnExpansionTest#testExcludeDefaultVirtualMetadataColumns`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/WindowAggregateQueryOperation.java:
##########
@@ -225,5 +249,27 @@ public String asSummaryString() {
                     throw new IllegalStateException("Unknown window type: " + 
type);
             }
         }
+
+        public String asSerializableString(String table) {
+            switch (type) {
+                case SLIDE:
+                    return String.format(
+                            "HOP((%s), DESCRIPTOR(%s), %s, %s)",
+                            table,

Review Comment:
   since `%s` can become large, should we also do some line break and indention 
here



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SourceQueryOperation.java:
##########
@@ -76,6 +78,25 @@ public String asSummaryString() {
                 "CatalogTable", args, getChildren(), 
Operation::asSummaryString);
     }
 
+    @Override
+    public String asSerializableString() {
+        final StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(
+                String.format(
+                        "SELECT %s FROM %s",
+                        getResolvedSchema().getColumnNames().stream()
+                                .map(EncodingUtils::escapeIdentifier)
+                                .collect(Collectors.joining(", ")),
+                        
getContextResolvedTable().getIdentifier().asSerializableString()));
+
+        if (dynamicOptions != null) {
+            stringBuilder.append(
+                    String.format("OPTIONS (%s)", 
OperationUtils.formatProperties(dynamicOptions)));

Review Comment:
   this doesn't look like correct SQL. Did you mean SQL hints here?



##########
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableApiTestStep.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.program;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.types.AbstractDataType;
+
+import java.util.function.Function;
+
+/** Test step for execution of a Table API. Similar to {@link SqlTestStep}. */
+public class TableApiTestStep implements TestStep {
+    private final Function<TableEnvAccessor, Table> tableQuery;
+    private final String sinkName;
+
+    TableApiTestStep(Function<TableEnvAccessor, Table> tableQuery, String 
sinkName) {

Review Comment:
   We recently introduced `TablePipeline`, maybe this is a better parameter and 
avoids having the additional `sinkName` parameter:
   
   ```
   (Function<TableEnvAccessor, TablePipeline> tablePipeline)
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ValuesQueryOperation.java:
##########
@@ -63,6 +65,26 @@ public String asSummaryString() {
                 "Values", args, getChildren(), Operation::asSummaryString);
     }
 
+    @Override
+    public String asSerializableString() {
+        return String.format(
+                "SELECT * FROM (VALUES %s\n) $VAL0(%s)",

Review Comment:
   nit: we could also follow the Calcite approach like `EXPR$0` and do `VAL$0`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/JoinQueryOperation.java:
##########
@@ -106,6 +107,32 @@ public String asSummaryString() {
                 "Join", args, getChildren(), Operation::asSummaryString);
     }
 
+    @Override
+    public String asSerializableString() {
+        // TODO: correlated

Review Comment:
   resolve TODO?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.QueryOperation;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.row;
+
+/**
+ * Collection of {@link TableTestProgram TableTestPrograms} for basic {@link 
QueryOperation
+ * QueryOperations}.
+ */
+@Internal
+public class QueryOperationTestPrograms {
+    static final TableTestProgram SOURCE_QUERY_OPERATION =
+            TableTestProgram.of("source-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .runTableApi(t -> t.from("s"), "sink")
+                    .runSql("SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`")
+                    .build();
+    static final TableTestProgram VALUES_QUERY_OPERATION =

Review Comment:
   nit: can we use an empty line between the test programs, it improves 
readability



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.QueryOperation;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.row;
+
+/**
+ * Collection of {@link TableTestProgram TableTestPrograms} for basic {@link 
QueryOperation
+ * QueryOperations}.
+ */
+@Internal
+public class QueryOperationTestPrograms {
+    static final TableTestProgram SOURCE_QUERY_OPERATION =
+            TableTestProgram.of("source-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .runTableApi(t -> t.from("s"), "sink")
+                    .runSql("SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`")

Review Comment:
   this abuses the test base a little bit, but I'm fine with this hack



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/OperationUtils.java:
##########
@@ -43,9 +43,7 @@ public class OperationUtils {
      * @return string with increased indentation
      */
     static String indent(String item) {
-        return "\n"
-                + OPERATION_INDENT
-                + item.replace("\n" + OPERATION_INDENT, "\n" + 
OPERATION_INDENT + OPERATION_INDENT);
+        return "\n" + OPERATION_INDENT + item.replaceAll("\n", "\n" + 
OPERATION_INDENT);

Review Comment:
   Is it guaranteed that the `item.replaceAll` has no impact on string 
literals? Maybe we should use SQL escaping in literals to avoid this?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationSqlExecutionTest.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.GroupAggregateTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.GroupWindowAggregateTestPrograms;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.SortTestPrograms;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.JoinTestPrograms;
+import org.apache.flink.table.test.program.TableApiTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.table.test.program.TableTestProgramRunner;
+import org.apache.flink.table.test.program.TestStep.TestKind;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for executing results of {@link 
QueryOperation#asSerializableString()}. */
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@ExtendWith(MiniClusterExtension.class)
+public class QueryOperationSqlExecutionTest implements TableTestProgramRunner {
+
+    @AfterEach
+    protected void after() {
+        TestValuesTableFactory.clearAllData();
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                QueryOperationTestPrograms.SOURCE_QUERY_OPERATION,
+                QueryOperationTestPrograms.VALUES_QUERY_OPERATION,
+                QueryOperationTestPrograms.FILTER_QUERY_OPERATION,
+                QueryOperationTestPrograms.AGGREGATE_QUERY_OPERATION,
+                QueryOperationTestPrograms.DISTINCT_QUERY_OPERATION,
+                QueryOperationTestPrograms.JOIN_QUERY_OPERATION,
+                QueryOperationTestPrograms.ORDER_BY_QUERY_OPERATION,
+                QueryOperationTestPrograms.WINDOW_AGGREGATE_QUERY_OPERATION,
+                QueryOperationTestPrograms.UNION_ALL_QUERY_OPERATION,
+                QueryOperationTestPrograms.LATERAL_JOIN_QUERY_OPERATION,
+                GroupWindowAggregateTestPrograms.GROUP_HOP_WINDOW_EVENT_TIME,
+                SortTestPrograms.SORT_LIMIT_DESC,
+                GroupAggregateTestPrograms.GROUP_BY_UDF_WITH_MERGE,
+                JoinTestPrograms.NON_WINDOW_INNER_JOIN);
+    }
+
+    @ParameterizedTest
+    @MethodSource("supportedPrograms")
+    void testSerializedSqlExecution(TableTestProgram program)
+            throws ExecutionException, InterruptedException {
+        final TableEnvironment env = setupEnv(program);
+
+        final TableApiTestStep tableApiStep =
+                (TableApiTestStep)
+                        program.runSteps.stream()
+                                .filter(s -> s instanceof TableApiTestStep)
+                                .findFirst()
+                                .get();
+
+        final TableResult tableResult = tableApiStep.applyAsSql(env);
+        tableResult.await();
+
+        program.getSetupSinkTestSteps()
+                .forEach(
+                        s -> {
+                            
assertThat(TestValuesTableFactory.getRawResultsAsStrings(s.name))
+                                    
.containsExactlyInAnyOrderElementsOf(s.getExpectedAsStrings());
+                        });
+    }
+
+    private static TableEnvironment setupEnv(TableTestProgram program) {
+        final TableEnvironment env = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        final Map<String, String> connectorOptions = new HashMap<>();
+        connectorOptions.put("connector", "values");
+        connectorOptions.put("sink-insert-only", "false");
+        connectorOptions.put("runtime-source", "NewSource");
+        program.getSetupSourceTestSteps()
+                .forEach(
+                        s -> {
+                            final List<Row> data = new 
ArrayList<>(s.dataBeforeRestore);
+                            data.addAll(s.dataAfterRestore);
+                            final String id = 
TestValuesTableFactory.registerData(data);
+                            connectorOptions.put("data-id", id);
+                            s.apply(env, connectorOptions);
+                        });
+        program.getSetupSinkTestSteps().forEach(s -> s.apply(env, 
connectorOptions));
+        program.getSetupFunctionTestSteps().forEach(f -> f.apply(env));
+        return env;
+    }
+
+    @Override
+    public EnumSet<TestKind> supportedSetupSteps() {
+        return EnumSet.of(
+                TestKind.FUNCTION,
+                TestKind.SOURCE_WITH_DATA,
+                TestKind.SOURCE_WITHOUT_DATA,
+                TestKind.SOURCE_WITH_RESTORE_DATA, // restore data is ignored
+                TestKind.SINK_WITH_DATA,
+                TestKind.SINK_WITH_RESTORE_DATA // restore data is ignored
+                );
+    }
+
+    @Override
+    public EnumSet<TestKind> supportedRunSteps() {
+        return EnumSet.of(TestKind.TABLE_API, TestKind.SQL);

Review Comment:
   There is no code for supporting TestKind.SQL.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateTestPrograms.java:
##########
@@ -252,6 +255,15 @@ public class GroupAggregateTestPrograms {
                                     + "my_avg(e, a) as s1, "
                                     + "my_concat(d) as c1 "
                                     + "FROM source_t GROUP BY e")
+                    .runTableApi(

Review Comment:
   so far all ExecNode tests are SQL based, can we remove this. otherwise it 
encourages people to mix table api and sql. we can also copy those 4 tests over 
to `QueryOperationTestPrograms`.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/QueryOperationTestPrograms.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.QueryOperation;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
+import static org.apache.flink.table.api.Expressions.row;
+
+/**
+ * Collection of {@link TableTestProgram TableTestPrograms} for basic {@link 
QueryOperation
+ * QueryOperations}.
+ */
+@Internal
+public class QueryOperationTestPrograms {
+    static final TableTestProgram SOURCE_QUERY_OPERATION =
+            TableTestProgram.of("source-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .runTableApi(t -> t.from("s"), "sink")
+                    .runSql("SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`")
+                    .build();
+    static final TableTestProgram VALUES_QUERY_OPERATION =
+            TableTestProgram.of("values-query-operation", "verifies sql 
serialization")
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(1L, "abc"), 
Row.of(2L, "cde"))
+                                    .build())
+                    .runTableApi(t -> t.fromValues(row(1L, "abc"), row(2L, 
"cde")), "sink")
+                    .runSql(
+                            "SELECT * FROM (VALUES \n"
+                                    + "    (CAST(1 AS BIGINT), 'abc'),\n"
+                                    + "    (CAST(2 AS BIGINT), 'cde')\n"
+                                    + ") $VAL0(`f0`, `f1`)")
+                    .build();
+    static final TableTestProgram FILTER_QUERY_OPERATION =
+            TableTestProgram.of("filter-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(Row.of(10L, "abc"), 
Row.of(20L, "cde"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(20L, "cde"))
+                                    .build())
+                    .runTableApi(t -> 
t.from("s").where($("a").isGreaterOrEqual(15)), "sink")
+                    .runSql(
+                            "SELECT * FROM (\n"
+                                    + "    SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
+                                    + ") WHERE `a` >= 15")
+                    .build();
+    static final TableTestProgram DISTINCT_QUERY_OPERATION =
+            TableTestProgram.of("distinct-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(
+                                            Row.of(20L, "apple"),
+                                            Row.of(20L, "apple"),
+                                            Row.of(5L, "pear"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a bigint", "b string")
+                                    .consumedValues(Row.of(20L, "apple"))
+                                    .build())
+                    .runTableApi(
+                            t -> 
t.from("s").where($("a").isGreaterOrEqual(15)).distinct(), "sink")
+                    .runSql(
+                            "SELECT DISTINCT * FROM (\n"
+                                    + "    SELECT * FROM (\n"
+                                    + "        SELECT `a`, `b` FROM 
`default_catalog`.`default_database`.`s`\n"
+                                    + "    ) WHERE `a` >= 15\n"
+                                    + ")")
+                    .build();
+    static final TableTestProgram AGGREGATE_QUERY_OPERATION =
+            TableTestProgram.of("aggregate-query-operation", "verifies sql 
serialization")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("s")
+                                    .addSchema("a bigint", "b string")
+                                    .producedValues(
+                                            Row.of(10L, "apple"),
+                                            Row.of(20L, "apple"),
+                                            Row.of(5L, "pear"),
+                                            Row.of(15L, "pear"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("a string", "b bigint")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"apple", 10L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"apple", 10L),

Review Comment:
   let's only define the materialized result here. we don't care about the 
changelog if this is not an ExecNode test, if we make the engine smarter later 
we need to update all these tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to