danny0405 commented on a change in pull request #13331:
URL: https://github.com/apache/flink/pull/13331#discussion_r514915286



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -147,6 +149,226 @@ static void processFirstRow(
                out.collect(currentRow);
        }
 
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+        * retracts previous element if needed.
+        *
+        * @param state                state of function
+        * @param currentRow           latest row received by deduplicate 
function
+        * @param out                  underlying collector
+        * @param rowtimeIndex         index of row time field
+        * @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert       flag to gennerate INSERT message or not
+        */
+       static void processLastRowOnRowtime(
+                       ValueState<RowData> state,
+                       RowData currentRow,
+                       Collector<RowData> out,
+                       int rowtimeIndex,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert) throws Exception {
+
+               checkInsertOnly(currentRow);
+               RowData prevRow = state.value();
+               if (!isLastRow(prevRow, currentRow, rowtimeIndex)) {
+                       return;
+               }
+               state.update(currentRow);
+
+               // store all needed data to state
+               collectRetractResult(
+                               generateUpdateBefore,
+                               generateInsert,
+                               prevRow,
+                               currentRow,
+                               out,
+                               null
+               );
+       }
+
+       /**
+        * Processes element to deduplicate on keys with row time semantic, 
sends current element if it is last row,
+        * retracts previous element if needed.
+        *
+        * @param state                state of function
+        * @param bufferedRows         latest rows received by deduplicate 
function
+        * @param serializer           serializer to serialize the data
+        * @param out                  underlying collector
+        * @param rowtimeIndex         index of row time field
+        * @param generateUpdateBefore flag to generate UPDATE_BEFORE message 
or not
+        * @param generateInsert       flag to gennerate INSERT message or not
+        */
+       static void processMiniBatchLastRowOnRowtime(
+                       ValueState<RowData> state,
+                       List<RowData> bufferedRows,
+                       TypeSerializer<RowData> serializer,
+                       Collector<RowData> out,
+                       int rowtimeIndex,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert) throws Exception {
+
+               if (bufferedRows == null) {
+                       return;
+               }
+
+               RowData preRow = state.value();
+               for (RowData currentRow : bufferedRows) {
+                       checkInsertOnly(currentRow);
+                       if (!isLastRow(preRow, currentRow, rowtimeIndex)) {

Review comment:
       The code is almost same with `processMiniBatchFirstRowOnRowtime`, we can 
abstract the common code out here.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateKeepFirstRowFunction.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.deduplicate;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processMiniBatchFirstRowOnRowTime;
+
+/**
+ * This function is used to get the first row for every key partition in 
miniBatch mode.
+ */
+public class RowTimeMiniBatchDeduplicateKeepFirstRowFunction
+               extends MiniBatchDeduplicateFunctionBase<RowData, RowData, 
List<RowData>, RowData, RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final int rowtimeIndex;
+       private final boolean generateUpdateBefore;
+       private final boolean generateInsert;
+
+       public RowTimeMiniBatchDeduplicateKeepFirstRowFunction(
+                       InternalTypeInfo<RowData> typeInfo,
+                       TypeSerializer<RowData> serializer,
+                       long minRetentionTime,
+                       int rowtimeIndex,
+                       boolean generateUpdateBefore,
+                       boolean generateInsert) {
+               super(typeInfo, serializer, minRetentionTime);
+               this.rowtimeIndex = rowtimeIndex;
+               this.generateUpdateBefore = generateUpdateBefore;
+               this.generateInsert = generateInsert;
+       }
+
+       @Override
+       public List<RowData> addInput(@Nullable List<RowData> value, RowData 
input) throws Exception {
+               if (value == null) {
+                       value = new ArrayList<>();
+               }
+               value.add(serializer.copy(input));
+               return value;
+       }
+
+       @Override
+       public void finishBundle(Map<RowData, List<RowData>> buffer, 
Collector<RowData> out) throws Exception {
+               for (Map.Entry<RowData, List<RowData>> entry : 
buffer.entrySet()) {
+                       RowData currentKey = entry.getKey();
+                       List<RowData> bufferedRows = entry.getValue();

Review comment:
       Both `addInput` and `finishBundle` can be moved to the base class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to