[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r414241756



##
File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java
##
@@ -47,10 +47,69 @@
 * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
 * of a row that is uniquely identifiable by a key.
 */
-   UPDATE_AFTER,
+   UPDATE_AFTER("UA", (byte) 2),
 
/**
 * Deletion operation.
 */
-   DELETE
+   DELETE("D", (byte) 3);
+
+   private final String shortString;
+
+   private final byte value;
+
+   /**
+* Creates a {@link RowKind} enum with the given short string and byte 
value representation of
+* the {@link RowKind}.
+*/
+   RowKind(String shortString, byte value) {
+   this.shortString = shortString;
+   this.value = value;
+   }
+
+   /**
+* Returns a short string representation of this {@link RowKind}.
+*
+* 

Review comment:
   Yes. But end users may care about the short string representation? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r414237437



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##
@@ -66,7 +66,7 @@ class CalcITCase extends StreamingTestBase {
 result.addSink(sink)
 env.execute()
 
-val expected = List("I(1,1,1)")
+val expected = List("I+(1,1,1)")

Review comment:
   Sounds good to me. Then it will look like this. 
   
   ```
   +I | a | 12 | FDF
   -U | b | 13 | EFC
   +U | b | 14 | CDE
   -D | d | 15 | IKH
   
   +I(a, 12, FDF)
   -U(b, 13, EFC)
   +U(b, 14, CDE)
   -D(d, 15, IKH)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r414237437



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
##
@@ -66,7 +66,7 @@ class CalcITCase extends StreamingTestBase {
 result.addSink(sink)
 env.execute()
 
-val expected = List("I(1,1,1)")
+val expected = List("I+(1,1,1)")

Review comment:
   ```
   +I | a | 12 | FDF
   -U | b | 13 | EFC
   +U | b | 14 | CDE
   -D | d | 15 | IKH
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r413915001



##
File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java
##
@@ -38,7 +38,7 @@
 * to retract the previous row first. It is useful in cases of a 
non-idempotent update, i.e., an
 * update of a row that is not uniquely identifiable by a key.
 */
-   UPDATE_BEFORE,
+   UPDATE_BEFORE("UB", (byte) 1),

Review comment:
   I thought about this. The reason I pick `UB` and `UA` is because Oracle 
uses `I` `D` `UO` `UN` to represent these 4 kinds. I think the "U" in the "UB" 
string is more meaningful than just a single "B".
   
   Another idea is padding `+` and `-` to `I` `D` `U` to make them have the 
same length and still can be distinguished easily. What do you think?
   
   ```
   I+ | a | 12 | FDF
   U- | b | 13 | EFC
   U+ | b | 14 | CDE
   D- | d | 15 | IKH
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-23 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r413888446



##
File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java
##
@@ -47,10 +47,69 @@
 * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
 * of a row that is uniquely identifiable by a key.
 */
-   UPDATE_AFTER,
+   UPDATE_AFTER("UA", (byte) 2),
 
/**
 * Deletion operation.
 */
-   DELETE
+   DELETE("D", (byte) 3);
+
+   private final String shortString;
+
+   private final byte value;
+
+   /**
+* Creates a {@link RowKind} enum with the given short string and byte 
value representation of
+* the {@link RowKind}.
+*/
+   RowKind(String shortString, byte value) {
+   this.shortString = shortString;
+   this.value = value;
+   }
+
+   /**
+* Returns a short string representation of this {@link RowKind}.
+*
+* 

Review comment:
   That's true. But if users are looking at the Javadoc website, the 
comment is the only thing he/she can see, not the code. That's why I put the 
list in the Javadoc. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-22 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r412925086



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -35,26 +35,31 @@
 *
 * @param currentRow latest row received by deduplicate function
 * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-* @param state state of function
+* @param state state of function, null if generateUpdateBefore is false
 * @param out underlying collector
 */
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
ValueState state,
Collector out) throws Exception {
-   // Check message should be accumulate
-   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
-   if (generateUpdateBefore) {
-   // state stores complete row if generateUpdateBefore is 
true
-   BaseRow preRow = state.value();
-   state.update(currentRow);
-   if (preRow != null) {
-   preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+   // check message should be insert only.
+   Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
+   BaseRow preRow = state.value();
+   state.update(currentRow);

Review comment:
   After offline disscussion, I will add an optimization configuration to 
always send AFTER instead of INSERT for first row. This can avoid state 
accessing but is disabled by default. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411840734



##
File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java
##
@@ -47,10 +47,69 @@
 * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
 * of a row that is uniquely identifiable by a key.
 */
-   UPDATE_AFTER,
+   UPDATE_AFTER("UA", (byte) 2),
 
/**
 * Deletion operation.
 */
-   DELETE
+   DELETE("D", (byte) 3);
+
+   private final String shortString;
+
+   private final byte value;
+
+   /**
+* Creates a {@link RowKind} enum with the given short string and byte 
value representation of
+* the {@link RowKind}.
+*/
+   RowKind(String shortString, byte value) {

Review comment:
   cc @twalthr , could you have a look the changes for RowKind?
   
   Using a byte value representation will be much faster than using enum 
ordinal during de/serialization. In my local benchmark, byte value is 24x fater 
than ordianl. The disadvantage is that the IDEA code completion show some 
verbose information (`DELETE("D", (byte) 3)`). 
   
   Benchmark code: 
https://github.com/wuchong/my-benchmark/blob/master/src/main/java/myflink/EnumBenchmark.java
   
   Benchmark Result:
   
   ```
   # Run complete. Total time: 00:03:35
   
   Benchmark   Mode  Cnt  Score  Error   Units
   EnumBenchmark.testOrdinal  thrpt   20876.048 ±   18.128  ops/ms
   EnumBenchmark.testValuethrpt   20  20827.764 ± 2084.072  ops/ms
   ```
   
   
![image](https://user-images.githubusercontent.com/5378924/79822112-ff662600-83c2-11ea-9fd6-9ba3fafba927.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411840734



##
File path: flink-core/src/main/java/org/apache/flink/types/RowKind.java
##
@@ -47,10 +47,69 @@
 * needs to retract the previous row first. OR it describes an 
idempotent update, i.e., an update
 * of a row that is uniquely identifiable by a key.
 */
-   UPDATE_AFTER,
+   UPDATE_AFTER("UA", (byte) 2),
 
/**
 * Deletion operation.
 */
-   DELETE
+   DELETE("D", (byte) 3);
+
+   private final String shortString;
+
+   private final byte value;
+
+   /**
+* Creates a {@link RowKind} enum with the given short string and byte 
value representation of
+* the {@link RowKind}.
+*/
+   RowKind(String shortString, byte value) {

Review comment:
   cc @twalthr , could you have a look the changes for RowKind?
   
   Using a byte value representation will be much faster than using enum 
ordinal during de/serialization. In my local benchmark, byte value is 24x fater 
than ordianl. The disadvantage is that the IDEA code completion show some 
verbose information. 
   
   Benchmark code: 
https://github.com/wuchong/my-benchmark/blob/master/src/main/java/myflink/EnumBenchmark.java
   
   Benchmark Result:
   
   ```
   # Run complete. Total time: 00:03:35
   
   Benchmark   Mode  Cnt  Score  Error   Units
   EnumBenchmark.testOrdinal  thrpt   20876.048 ±   18.128  ops/ms
   EnumBenchmark.testValuethrpt   20  20827.764 ± 2084.072  ops/ms
   ```
   
   
![image](https://user-images.githubusercontent.com/5378924/79822112-ff662600-83c2-11ea-9fd6-9ba3fafba927.png)
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411487551



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
##
@@ -63,7 +63,7 @@ class WindowEmitStrategy(
 
   def produceUpdates: Boolean = {
 if (isEventTime) {
-  allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled
+  earlyFireDelayEnabled || lateFireDelayEnabled

Review comment:
   I will still keep this fix in this PR to make the whole tests to pass. 
But I will split it into another commit. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411481795



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -35,25 +37,29 @@
 *
 * @param currentRow latest row received by deduplicate function
 * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-* @param state state of function
+* @param state state of function, null if generateUpdateBefore is false
 * @param out underlying collector
 */
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
-   ValueState state,
+   @Nullable ValueState state,
Collector out) throws Exception {
-   // Check message should be accumulate
-   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+   // check message should be insert only.
+   Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
if (generateUpdateBefore) {
-   // state stores complete row if generateUpdateBefore is 
true
+   // state is not null when generateUpdateBefore is 
enabled,
+   // the state stores complete row
BaseRow preRow = state.value();
state.update(currentRow);
if (preRow != null) {
-   preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+   preRow.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(preRow);
}
}
+   // in order for better performance, we don't have state for 
LastRow
+   // if not generate UPDATE_BEFORE, thus, we can't produce INSERT 
messages for first row.

Review comment:
   I changed the logic to emit INSERT for first row with the cost of 
additional state accessing. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411461935



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
##
@@ -17,14 +17,16 @@
 
 package org.apache.flink.table.dataformat;
 
+import org.apache.flink.types.RowKind;
+
 /**
  * Join two row to one row.
  */
 public final class JoinedRow implements BaseRow {
 
private BaseRow row1;
private BaseRow row2;
-   private byte header;
+   private RowKind rowKind = RowKind.INSERT;

Review comment:
   This keeps the same behavior as before, default as `INSERT`. And this 
also aligns the behavior of our new `GenericRowData`. If we don't have a 
default value, then the RowKind will be nullable, and code maybe error-prone.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411203914



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -35,25 +37,29 @@
 *
 * @param currentRow latest row received by deduplicate function
 * @param generateUpdateBefore whether need to send UPDATE_BEFORE 
message for updates
-* @param state state of function
+* @param state state of function, null if generateUpdateBefore is false
 * @param out underlying collector
 */
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
-   ValueState state,
+   @Nullable ValueState state,
Collector out) throws Exception {
-   // Check message should be accumulate
-   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+   // check message should be insert only.
+   Preconditions.checkArgument(currentRow.getRowKind() == 
RowKind.INSERT);
if (generateUpdateBefore) {
-   // state stores complete row if generateUpdateBefore is 
true
+   // state is not null when generateUpdateBefore is 
enabled,
+   // the state stores complete row
BaseRow preRow = state.value();
state.update(currentRow);
if (preRow != null) {
-   preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+   preRow.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(preRow);
}
}
+   // in order for better performance, we don't have state for 
LastRow
+   // if not generate UPDATE_BEFORE, thus, we can't produce INSERT 
messages for first row.

Review comment:
   Upsert databases uses `UPSERT` statement (or called REPLACE statement) 
which will replace values if the key exists, otherwise, it will insert a new 
row. So it works if all rows are `UPDATE`. 
   
   Currently, the upsert sinks doesn't distinguish `INSERT` and `UPDATE` 
messages, they will be executed in `UPSERT` statements. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411200526



##
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
##
@@ -75,57 +75,57 @@ class GroupAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
 // register cleanup timer with 3001
 testHarness.setProcessingTime(1)
 
-// accumulate
-testHarness.processElement(new StreamRecord(binaryrow("aaa", 1L: JLong), 
1))
-expectedOutput.add(new StreamRecord(binaryrow("aaa", 1L: JLong), 1))
+// insertion
+testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))

Review comment:
   Yes. It's unnecessary, because this is a processing-time aggregation 
which doesn't depend on event-time. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] wuchong commented on a change in pull request #11797: [FLINK-17169][table-blink] Refactor BaseRow to use RowKind instead of byte header

2020-04-20 Thread GitBox


wuchong commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411198620



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
##
@@ -63,7 +63,7 @@ class WindowEmitStrategy(
 
   def produceUpdates: Boolean = {
 if (isEventTime) {
-  allowLateness > 0 || earlyFireDelayEnabled || lateFireDelayEnabled
+  earlyFireDelayEnabled || lateFireDelayEnabled

Review comment:
   Yes. I think this is a bug when we merging blink code. A window 
aggregation can produce updates only when early fire or late fire is enabled. 
`allowLateness` is just a hint to the statebackend for how long to keep the 
state when window is closed. If late data never trigger emission, the window 
still produce insert-only messages. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org