[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator
danny0405 commented on a change in pull request #13331: URL: https://github.com/apache/flink/pull/13331#discussion_r514915286 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ## @@ -147,6 +149,226 @@ static void processFirstRow( out.collect(currentRow); } + /** +* Processes element to deduplicate on keys with row time semantic, sends current element if it is last row, +* retracts previous element if needed. +* +* @param statestate of function +* @param currentRow latest row received by deduplicate function +* @param out underlying collector +* @param rowtimeIndex index of row time field +* @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not +* @param generateInsert flag to gennerate INSERT message or not +*/ + static void processLastRowOnRowtime( + ValueState state, + RowData currentRow, + Collector out, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) throws Exception { + + checkInsertOnly(currentRow); + RowData prevRow = state.value(); + if (!isLastRow(prevRow, currentRow, rowtimeIndex)) { + return; + } + state.update(currentRow); + + // store all needed data to state + collectRetractResult( + generateUpdateBefore, + generateInsert, + prevRow, + currentRow, + out, + null + ); + } + + /** +* Processes element to deduplicate on keys with row time semantic, sends current element if it is last row, +* retracts previous element if needed. +* +* @param statestate of function +* @param bufferedRows latest rows received by deduplicate function +* @param serializer serializer to serialize the data +* @param out underlying collector +* @param rowtimeIndex index of row time field +* @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not +* @param generateInsert flag to gennerate INSERT message or not +*/ + static void processMiniBatchLastRowOnRowtime( + ValueState state, + List bufferedRows, + TypeSerializer serializer, + Collector out, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) throws Exception { + + if (bufferedRows == null) { + return; + } + + RowData preRow = state.value(); + for (RowData currentRow : bufferedRows) { + checkInsertOnly(currentRow); + if (!isLastRow(preRow, currentRow, rowtimeIndex)) { Review comment: The code is almost same with `processMiniBatchFirstRowOnRowtime`, we can abstract the common code out here. ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeMiniBatchDeduplicateKeepFirstRowFunction.java ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.deduplicate; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.util.Collector; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map;
[GitHub] [flink] danny0405 commented on a change in pull request #13331: [FLINK-19079][table-runtime] Import rowtime deduplicate operator
danny0405 commented on a change in pull request #13331: URL: https://github.com/apache/flink/pull/13331#discussion_r508472527 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala ## @@ -102,44 +110,109 @@ class StreamExecDeduplicate( .asInstanceOf[Transformation[RowData]] val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] +val inputFieldTypes = rowTypeInfo.toRowFieldTypes +val keyFieldTypes = new Array[LogicalType](uniqueKeys.length) +for (i <- 0 until uniqueKeys.length) { + keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i)) +} + Review comment: You can use `uniqueKeys .map(idx => inputFieldTypes(idx))` directly. ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala ## @@ -102,44 +110,109 @@ class StreamExecDeduplicate( .asInstanceOf[Transformation[RowData]] val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] +val inputFieldTypes = rowTypeInfo.toRowFieldTypes +val keyFieldTypes = new Array[LogicalType](uniqueKeys.length) +for (i <- 0 until uniqueKeys.length) { + keyFieldTypes(i) = inputFieldTypes(uniqueKeys(i)) +} + val generateUpdateBefore = ChangelogPlanUtils.generateUpdateBefore(this) val tableConfig = planner.getTableConfig val generateInsert = tableConfig.getConfiguration .getBoolean(TABLE_EXEC_INSERT_AND_UPDATE_AFTER_SENSITIVE) val isMiniBatchEnabled = tableConfig.getConfiguration.getBoolean( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED) val minRetentionTime = tableConfig.getMinIdleStateRetentionTime -val operator = if (isMiniBatchEnabled) { - val exeConfig = planner.getExecEnv.getConfig - val rowSerializer = rowTypeInfo.createSerializer(exeConfig) + +val rowtimeField = input.getRowType.getFieldList + .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) +val rowtimeIndex = if (isRowtime) { + Preconditions.checkArgument(rowtimeField.nonEmpty) + rowtimeField.get(0).getIndex +} else { + -1 +} + +val miniBatchsize = if (isMiniBatchEnabled) { + val size = tableConfig.getConfiguration.getLong( +ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE) + Preconditions.checkArgument(size > 0) + size +} else { + -1L +} +val exeConfig = planner.getExecEnv.getConfig +val rowSerializer = rowTypeInfo.createSerializer(exeConfig) + +val operator = if (isRowtime) { + if(isMiniBatchEnabled) { +val processFunction = if (keepLastRow) { + new RowTimeMiniBatchDeduplicateKeepLastRowFunction( +rowTypeInfo, Review comment: Can we have 2 sub-class here ? One is for `proctime` another is for `rowtime`, in each class, we can have a method to return the row function `getRowFunctction(isMiniBatchEnabled, keepLastRow)`. There are too many if else branches here and it is hard to maintain. ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java ## @@ -93,6 +95,239 @@ static void processFirstRow( out.collect(currentRow); } + /** +* Processes element to deduplicate on keys with row time semantic, sends current element if it is last row, +* retracts previous element if needed. +* +* @param state state of function +* @param currentRow latest row received by deduplicate function +* @param serializer serializer to serialize the data +* @param out underlying collector +* @param rowtimeIndex index of row time field +* @param generateUpdateBefore flag to generate UPDATE_BEFORE message or not +* @param generateInsert flag to gennerate INSERT message or not +*/ + static void processLastRowOnRowtime( + ValueState state, + RowData currentRow, + TypeSerializer serializer, + Collector out, + int rowtimeIndex, + boolean generateUpdateBefore, + boolean generateInsert) throws Exception { + + checkInsertOnly(currentRow); + RowData prevRow = state.value(); + if (!isLastRow(prevRow, currentRow, rowtimeIndex)) { + return; + } + state.update(currentRow); + + // store all needed data to state + if (generateUpdateBefore || generateInsert) { + if (prevRow == null) { + // the first row, send INSERT message +