[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r414241756 ## File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java ## @@ -47,10 +47,69 @@ * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ - UPDATE_AFTER, + UPDATE_AFTER("UA", (byte) 2), /** * Deletion operation. */ - DELETE + DELETE("D", (byte) 3); + + private final String shortString; + + private final byte value; + + /** +* Creates a {@link RowKind} enum with the given short string and byte value representation of +* the {@link RowKind}. +*/ + RowKind(String shortString, byte value) { + this.shortString = shortString; + this.value = value; + } + + /** +* Returns a short string representation of this {@link RowKind}. +* +* Review comment: Yes. But end users may care about the short string representation? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r414237437 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala ## @@ -66,7 +66,7 @@ class CalcITCase extends StreamingTestBase { result.addSink(sink) env.execute() -val expected = List("I(1,1,1)") +val expected = List("I+(1,1,1)") Review comment: Sounds good to me. Then it will look like this. ``` +I | a | 12 | FDF -U | b | 13 | EFC +U | b | 14 | CDE -D | d | 15 | IKH +I(a, 12, FDF) -U(b, 13, EFC) +U(b, 14, CDE) -D(d, 15, IKH) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r414237437 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala ## @@ -66,7 +66,7 @@ class CalcITCase extends StreamingTestBase { result.addSink(sink) env.execute() -val expected = List("I(1,1,1)") +val expected = List("I+(1,1,1)") Review comment: ``` +I | a | 12 | FDF -U | b | 13 | EFC +U | b | 14 | CDE -D | d | 15 | IKH ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r413915001 ## File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java ## @@ -38,7 +38,7 @@ * to retract the previous row first. It is useful in cases of a non-idempotent update, i.e., an * update of a row that is not uniquely identifiable by a key. */ - UPDATE_BEFORE, + UPDATE_BEFORE("UB", (byte) 1), Review comment: I thought about this. The reason I pick `UB` and `UA` is because Oracle uses `I` `D` `UO` `UN` to represent these 4 kinds. I think the "U" in the "UB" string is more meaningful than just a single "B". Another idea is padding `+` and `-` to `I` `D` `U` to make them have the same length and still can be distinguished easily. What do you think? ``` I+ | a | 12 | FDF U- | b | 13 | EFC U+ | b | 14 | CDE D- | d | 15 | IKH ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r413888446 ## File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java ## @@ -47,10 +47,69 @@ * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ - UPDATE_AFTER, + UPDATE_AFTER("UA", (byte) 2), /** * Deletion operation. */ - DELETE + DELETE("D", (byte) 3); + + private final String shortString; + + private final byte value; + + /** +* Creates a {@link RowKind} enum with the given short string and byte value representation of +* the {@link RowKind}. +*/ + RowKind(String shortString, byte value) { + this.shortString = shortString; + this.value = value; + } + + /** +* Returns a short string representation of this {@link RowKind}. +* +* Review comment: That's true. But if users are looking at the Javadoc website, the comment is the only thing he/she can see, not the code. That's why I put the list in the Javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r412925086 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ## @@ -35,26 +35,31 @@ * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates -* @param state state of function +* @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRow( BaseRow currentRow, boolean generateUpdateBefore, ValueState state, Collector out) throws Exception { - // Check message should be accumulate - Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow)); - if (generateUpdateBefore) { - // state stores complete row if generateUpdateBefore is true - BaseRow preRow = state.value(); - state.update(currentRow); - if (preRow != null) { - preRow.setHeader(BaseRowUtil.RETRACT_MSG); + // check message should be insert only. + Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT); + BaseRow preRow = state.value(); + state.update(currentRow); Review comment: After offline disscussion, I will add an optimization configuration to always send AFTER instead of INSERT for first row. This can avoid state accessing but is disabled by default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411840734 ## File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java ## @@ -47,10 +47,69 @@ * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ - UPDATE_AFTER, + UPDATE_AFTER("UA", (byte) 2), /** * Deletion operation. */ - DELETE + DELETE("D", (byte) 3); + + private final String shortString; + + private final byte value; + + /** +* Creates a {@link RowKind} enum with the given short string and byte value representation of +* the {@link RowKind}. +*/ + RowKind(String shortString, byte value) { Review comment: cc @twalthr , could you have a look the changes for RowKind? Using a byte value representation will be much faster than using enum ordinal during de/serialization. In my local benchmark, byte value is 24x fater than ordianl. The disadvantage is that the IDEA code completion show some verbose information (`DELETE("D", (byte) 3)`). Benchmark code: https://github.com/wuchong/my-benchmark/blob/master/src/main/java/myflink/EnumBenchmark.java Benchmark Result: ``` # Run complete. Total time: 00:03:35 Benchmark Mode Cnt Score Error Units EnumBenchmark.testOrdinal thrpt 20876.048 ± 18.128 ops/ms EnumBenchmark.testValuethrpt 20 20827.764 ± 2084.072 ops/ms ``` ![image](https://user-images.githubusercontent.com/5378924/79822112-ff662600-83c2-11ea-9fd6-9ba3fafba927.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411840734 ## File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java ## @@ -47,10 +47,69 @@ * needs to retract the previous row first. OR it describes an idempotent update, i.e., an update * of a row that is uniquely identifiable by a key. */ - UPDATE_AFTER, + UPDATE_AFTER("UA", (byte) 2), /** * Deletion operation. */ - DELETE + DELETE("D", (byte) 3); + + private final String shortString; + + private final byte value; + + /** +* Creates a {@link RowKind} enum with the given short string and byte value representation of +* the {@link RowKind}. +*/ + RowKind(String shortString, byte value) { Review comment: cc @twalthr , could you have a look the changes for RowKind? Using a byte value representation will be much faster than using enum ordinal during de/serialization. In my local benchmark, byte value is 24x fater than ordianl. The disadvantage is that the IDEA code completion show some verbose information. Benchmark code: https://github.com/wuchong/my-benchmark/blob/master/src/main/java/myflink/EnumBenchmark.java Benchmark Result: ``` # Run complete. Total time: 00:03:35 Benchmark Mode Cnt Score Error Units EnumBenchmark.testOrdinal thrpt 20876.048 ± 18.128 ops/ms EnumBenchmark.testValuethrpt 20 20827.764 ± 2084.072 ops/ms ``` ![image](https://user-images.githubusercontent.com/5378924/79822112-ff662600-83c2-11ea-9fd6-9ba3fafba927.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411487551 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala ## @@ -63,7 +63,7 @@ class WindowEmitStrategy( def produceUpdates: Boolean = { if (isEventTime) { - allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled + earlyFireDelayEnabled || lateFireDelayEnabled Review comment: I will still keep this fix in this PR to make the whole tests to pass. But I will split it into another commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411481795 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ## @@ -35,25 +37,29 @@ * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates -* @param state state of function +* @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRow( BaseRow currentRow, boolean generateUpdateBefore, - ValueState state, + @Nullable ValueState state, Collector out) throws Exception { - // Check message should be accumulate - Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow)); + // check message should be insert only. + Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT); if (generateUpdateBefore) { - // state stores complete row if generateUpdateBefore is true + // state is not null when generateUpdateBefore is enabled, + // the state stores complete row BaseRow preRow = state.value(); state.update(currentRow); if (preRow != null) { - preRow.setHeader(BaseRowUtil.RETRACT_MSG); + preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } } + // in order for better performance, we don't have state for LastRow + // if not generate UPDATE_BEFORE, thus, we can't produce INSERT messages for first row. Review comment: I changed the logic to emit INSERT for first row with the cost of additional state accessing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411461935 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java ## @@ -17,14 +17,16 @@ package org.apache.flink.table.dataformat; +import org.apache.flink.types.RowKind; + /** * Join two row to one row. */ public final class JoinedRow implements BaseRow { private BaseRow row1; private BaseRow row2; - private byte header; + private RowKind rowKind = RowKind.INSERT; Review comment: This keeps the same behavior as before, default as `INSERT`. And this also aligns the behavior of our new `GenericRowData`. If we don't have a default value, then the RowKind will be nullable, and code maybe error-prone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411203914 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ## @@ -35,25 +37,29 @@ * * @param currentRow latest row received by deduplicate function * @param generateUpdateBefore whether need to send UPDATE_BEFORE message for updates -* @param state state of function +* @param state state of function, null if generateUpdateBefore is false * @param out underlying collector */ static void processLastRow( BaseRow currentRow, boolean generateUpdateBefore, - ValueState state, + @Nullable ValueState state, Collector out) throws Exception { - // Check message should be accumulate - Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow)); + // check message should be insert only. + Preconditions.checkArgument(currentRow.getRowKind() == RowKind.INSERT); if (generateUpdateBefore) { - // state stores complete row if generateUpdateBefore is true + // state is not null when generateUpdateBefore is enabled, + // the state stores complete row BaseRow preRow = state.value(); state.update(currentRow); if (preRow != null) { - preRow.setHeader(BaseRowUtil.RETRACT_MSG); + preRow.setRowKind(RowKind.UPDATE_BEFORE); out.collect(preRow); } } + // in order for better performance, we don't have state for LastRow + // if not generate UPDATE_BEFORE, thus, we can't produce INSERT messages for first row. Review comment: Upsert databases uses `UPSERT` statement (or called REPLACE statement) which will replace values if the key exists, otherwise, it will insert a new row. So it works if all rows are `UPDATE`. Currently, the upsert sinks doesn't distinguish `INSERT` and `UPDATE` messages, they will be executed in `UPSERT` statements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411200526 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala ## @@ -75,57 +75,57 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( // register cleanup timer with 3001 testHarness.setProcessingTime(1) -// accumulate -testHarness.processElement(new StreamRecord(binaryrow("aaa", 1L: JLong), 1)) -expectedOutput.add(new StreamRecord(binaryrow("aaa", 1L: JLong), 1)) +// insertion +testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong)) Review comment: Yes. It's unnecessary, because this is a processing-time aggregation which doesn't depend on event-time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header
wuchong commented on a change in pull request #11797: URL: https://github.com/apache/flink/pull/11797#discussion_r411198620 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala ## @@ -63,7 +63,7 @@ class WindowEmitStrategy( def produceUpdates: Boolean = { if (isEventTime) { - allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled + earlyFireDelayEnabled || lateFireDelayEnabled Review comment: Yes. I think this is a bug when we merging blink code. A window aggregation can produce updates only when early fire or late fire is enabled. `allowLateness` is just a hint to the statebackend for how long to keep the state when window is closed. If late data never trigger emission, the window still produce insert-only messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org