This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new d36a2b3 [FLINK-17944][sql-client] Wrong output in SQL Client's table mode d36a2b3 is described below commit d36a2b3f02e9e31ab61d296ae76c3cffd48b4569 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue May 26 22:01:39 2020 +0800 [FLINK-17944][sql-client] Wrong output in SQL Client's table mode This is a temporary workaround until we don't use Tuple2<Boolean, Row> to represent changelogs anymore. This closes #12346. --- .../local/result/MaterializedCollectStreamResult.java | 5 +++++ .../local/result/MaterializedCollectStreamResultTest.java | 15 ++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java index 40dd676..2aec400 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.net.InetAddress; import java.util.ArrayList; @@ -186,6 +187,10 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i @Override protected void processRecord(Tuple2<Boolean, Row> change) { synchronized (resultLock) { + // Always set the RowKind to INSERT, so that we can compare rows correctly (RowKind will be ignored), + // just use the Boolean of Tuple2<Boolean, Row> to figure out whether it is insert or delete. + change.f1.setKind(RowKind.INSERT); + // insert if (change.f0) { processInsert(change.f1); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java index 0abd0d5..e31372d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.junit.Test; @@ -57,10 +58,10 @@ public class MaterializedCollectStreamResultTest { result.isRetrieving = true; - result.processRecord(Tuple2.of(true, Row.of("A", 1))); - result.processRecord(Tuple2.of(true, Row.of("B", 1))); - result.processRecord(Tuple2.of(true, Row.of("A", 1))); - result.processRecord(Tuple2.of(true, Row.of("C", 2))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "B", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1))); + result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "C", 2))); assertEquals(TypedResult.payload(4), result.snapshot(1)); @@ -69,7 +70,7 @@ public class MaterializedCollectStreamResultTest { assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3)); assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4)); - result.processRecord(Tuple2.of(false, Row.of("A", 1))); + result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1))); assertEquals(TypedResult.payload(3), result.snapshot(1)); @@ -77,8 +78,8 @@ public class MaterializedCollectStreamResultTest { assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2)); assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3)); - result.processRecord(Tuple2.of(false, Row.of("C", 2))); - result.processRecord(Tuple2.of(false, Row.of("A", 1))); + result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "C", 2))); + result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1))); assertEquals(TypedResult.payload(1), result.snapshot(1));