danny0405 commented on a change in pull request #13331: URL: https://github.com/apache/flink/pull/13331#discussion_r514915286
########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ########## @@ -147,6 +149,226 @@ static void processFirstRow( out.collect(currentRow); } + /** + * Processes element to deduplicate on keys with row time semantic, sends current element if it is last row, + * retracts previous element if needed. + * + * @param state state of function + * @param currentRow latest row received by deduplicate function + * @param out underlying collector + * @param rowtimeIndex index of row time field + * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not + * @param generateInsert flag to gennerate INSERT message or not + */ + static void processLastRowOnRowtime( + ValueState<RowData> state, + RowData currentRow, + Collector<RowData> out, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) throws Exception { + + checkInsertOnly(currentRow); + RowData prevRow = state.value(); + if (!isLastRow(prevRow, currentRow, rowtimeIndex)) { + return; + } + state.update(currentRow); + + // store all needed data to state + collectRetractResult( + generateUpdateBefore, + generateInsert, + prevRow, + currentRow, + out, + null + ); + } + + /** + * Processes element to deduplicate on keys with row time semantic, sends current element if it is last row, + * retracts previous element if needed. + * + * @param state state of function + * @param bufferedRows latest rows received by deduplicate function + * @param serializer serializer to serialize the data + * @param out underlying collector + * @param rowtimeIndex index of row time field + * @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not + * @param generateInsert flag to gennerate INSERT message or not + */ + static void processMiniBatchLastRowOnRowtime( + ValueState<RowData> state, + List<RowData> bufferedRows, + TypeSerializer<RowData> serializer, + Collector<RowData> out, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) throws Exception { + + if (bufferedRows == null) { + return; + } + + RowData preRow = state.value(); + for (RowData currentRow : bufferedRows) { + checkInsertOnly(currentRow); + if (!isLastRow(preRow, currentRow, rowtimeIndex)) { Review comment: The code is almost same with `processMiniBatchFirstRowOnRowtime`, we can abstract the common code out here. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateKeepFirstRowFunction.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.deduplicate; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionHelper.processMiniBatchFirstRowOnRowTime; + +/** + * This function is used to get the first row for every key partition in miniBatch mode. + */ +public class RowTimeMiniBatchDeduplicateKeepFirstRowFunction + extends MiniBatchDeduplicateFunctionBase<RowData, RowData, List<RowData>, RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private final int rowtimeIndex; + private final boolean generateUpdateBefore; + private final boolean generateInsert; + + public RowTimeMiniBatchDeduplicateKeepFirstRowFunction( + InternalTypeInfo<RowData> typeInfo, + TypeSerializer<RowData> serializer, + long minRetentionTime, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) { + super(typeInfo, serializer, minRetentionTime); + this.rowtimeIndex = rowtimeIndex; + this.generateUpdateBefore = generateUpdateBefore; + this.generateInsert = generateInsert; + } + + @Override + public List<RowData> addInput(@Nullable List<RowData> value, RowData input) throws Exception { + if (value == null) { + value = new ArrayList<>(); + } + value.add(serializer.copy(input)); + return value; + } + + @Override + public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out) throws Exception { + for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) { + RowData currentKey = entry.getKey(); + List<RowData> bufferedRows = entry.getValue(); Review comment: Both `addInput` and `finishBundle` can be moved to the base class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org