[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator

2020-10-30 Thread GitBox


danny0405 commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r514915286



##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -147,6 +149,226 @@ static void processFirstRow(
out.collect(currentRow);
}
 
+   /**
+* Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+* retracts previous element if needed.
+*
+* @param statestate of function
+* @param currentRow   latest row received by deduplicate 
function
+* @param out  underlying collector
+* @param rowtimeIndex index of row time field
+* @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+* @param generateInsert   flag to gennerate INSERT message or not
+*/
+   static void processLastRowOnRowtime(
+   ValueState state,
+   RowData currentRow,
+   Collector out,
+   int rowtimeIndex,
+   boolean generateUpdateBefore,
+   boolean generateInsert) throws Exception {
+
+   checkInsertOnly(currentRow);
+   RowData prevRow = state.value();
+   if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+   return;
+   }
+   state.update(currentRow);
+
+   // store all needed data to state
+   collectRetractResult(
+   generateUpdateBefore,
+   generateInsert,
+   prevRow,
+   currentRow,
+   out,
+   null
+   );
+   }
+
+   /**
+* Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+* retracts previous element if needed.
+*
+* @param statestate of function
+* @param bufferedRows latest rows received by deduplicate 
function
+* @param serializer   serializer to serialize the data
+* @param out  underlying collector
+* @param rowtimeIndex index of row time field
+* @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+* @param generateInsert   flag to gennerate INSERT message or not
+*/
+   static void processMiniBatchLastRowOnRowtime(
+   ValueState state,
+   List bufferedRows,
+   TypeSerializer serializer,
+   Collector out,
+   int rowtimeIndex,
+   boolean generateUpdateBefore,
+   boolean generateInsert) throws Exception {
+
+   if (bufferedRows == null) {
+   return;
+   }
+
+   RowData preRow = state.value();
+   for (RowData currentRow : bufferedRows) {
+   checkInsertOnly(currentRow);
+   if (!isLastRow(preRow, currentRow, rowtimeIndex)) {

Review comment:
   The code is almost same with `processMiniBatchFirstRowOnRowtime`, we can 
abstract the common code out here.

##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateKeepFirstRowFunction.java
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;

[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator

2020-10-20 Thread GitBox


danny0405 commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r508472527



##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
   .asInstanceOf[Transformation[RowData]]
 
 val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+for (i <- 0 until uniqueKeys.length) {
+  keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+}
+

Review comment:
   You can use `uniqueKeys .map(idx => inputFieldTypes(idx))` directly.

##
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
##
@@ -102,44 +110,109 @@ class StreamExecDeduplicate(
   .asInstanceOf[Transformation[RowData]]
 
 val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]]
+val inputFieldTypes = rowTypeInfo.toRowFieldTypes
+val keyFieldTypes = new Array[LogicalType](uniqueKeys.length)
+for (i <- 0 until uniqueKeys.length) {
+  keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i))
+}
+
 val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this)
 val tableConfig = planner.getTableConfig
 val generateInsert = tableConfig.getConfiguration
   .getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE)
 val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean(
   ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)
 val minRetentionTime = tableConfig.getMinIdleStateRetentionTime
-val operator = if (isMiniBatchEnabled) {
-  val exeConfig = planner.getExecEnv.getConfig
-  val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+val rowtimeField = input.getRowType.getFieldList
+  .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+val rowtimeIndex = if (isRowtime) {
+  Preconditions.checkArgument(rowtimeField.nonEmpty)
+  rowtimeField.get(0).getIndex
+} else {
+  -1
+}
+
+val miniBatchsize = if (isMiniBatchEnabled) {
+  val size = tableConfig.getConfiguration.getLong(
+ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE)
+  Preconditions.checkArgument(size > 0)
+  size
+} else {
+  -1L
+}
+val exeConfig = planner.getExecEnv.getConfig
+val rowSerializer = rowTypeInfo.createSerializer(exeConfig)
+
+val operator = if (isRowtime) {
+  if(isMiniBatchEnabled) {
+val processFunction = if (keepLastRow) {
+  new RowTimeMiniBatchDeduplicateKeepLastRowFunction(
+rowTypeInfo,

Review comment:
   Can we have 2 sub-class here ? One is for `proctime` another is for 
`rowtime`, in each class, we can have a method to return the row function 
`getRowFunctction(isMiniBatchEnabled, keepLastRow)`.
   
   There are too many if else branches here and it is hard to maintain.

##
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##
@@ -93,6 +95,239 @@ static void processFirstRow(
out.collect(currentRow);
}
 
+   /**
+* Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+* retracts previous element if needed.
+*
+* @param state state of function
+* @param currentRow latest row received by deduplicate function
+* @param serializer serializer to serialize the data
+* @param out underlying collector
+* @param rowtimeIndex index of row time field
+* @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+* @param generateInsert flag to gennerate INSERT message or not
+*/
+   static void processLastRowOnRowtime(
+   ValueState state,
+   RowData currentRow,
+   TypeSerializer serializer,
+   Collector out,
+   int rowtimeIndex,
+   boolean generateUpdateBefore,
+   boolean generateInsert) throws Exception {
+
+   checkInsertOnly(currentRow);
+   RowData prevRow = state.value();
+   if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+   return;
+   }
+   state.update(currentRow);
+
+   // store all needed data to state
+   if (generateUpdateBefore || generateInsert) {
+   if (prevRow == null) {
+   // the first row, send INSERT message
+