twalthr commented on code in PR #27502: URL: https://github.com/apache/flink/pull/27502#discussion_r2758541227
########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java: ########## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WatermarkCompactingSinkMaterializer}. */ +class WatermarkCompactingSinkMaterializerTest { + + private static final int PRIMARY_KEY_INDEX = 1; + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final GeneratedRecordEqualiser RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert first record (watermark is MIN_VALUE) + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); // Buffered, waiting for watermark + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Update with same upsert key (this is the expected pattern for single-source updates) + harness.processElement(updateAfterRecord(1L, 1, "a2")); + assertEmitsNothing(harness); + + // Advance watermark again + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Delete and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and delete before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testDoNothingKeepsFirstRecord() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should keep the first record + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first")); + } + } + + @Test + void testDoErrorThrowsOnConflict() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should throw exception + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @Test + void testDoErrorAllowsSameUpsertKey() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with same upsert key (updates to same source) + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processElement(updateAfterRecord(1L, 1, "v2")); + + // Compact - should not throw, just keep the latest + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testChangelogDisorderHandling(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Simulate changelog disorder from FLIP-558 example: + // Records from different sources (different upsert keys: 1L and 2L) map to same PK (1) + // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1) + // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1) + + // The +U from source 2 arrives first (upsert key = 2L) + harness.processElement(updateAfterRecord(2L, 1, "b1")); + // Then +I and -U from source 1 arrive (upsert key = 1L) + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Net result: only (2L, 1, "b1") remains after cancellation, no conflict + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "value")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "value")); + + // Insert same value again (same upsert key) + harness.processElement(updateAfterRecord(1L, 1, "value")); + harness.processWatermark(200L); + // Should not emit since value is the same + assertEmitsNothing(harness); + } + } + + /** + * Tests that record timestamps are handled correctly when multiple inputs send records that + * arrive out of timestamp order. This simulates the case where records from different upstream + * tasks have different timestamps and arrive interleaved. + * + * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All records have same primary + * key (1). + * + * <p>Sequence: + * + * <ol> + * <li>INSERT(input=1, t=2) + * <li>watermark=3 -> emits INSERT + * <li>UPDATE_BEFORE(input=1, t=4) + * <li>UPDATE_AFTER(input=1, t=6) + * <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has smaller timestamp + * <li>watermark=5 -> compacts t<=5 records + * <li>UPDATE_BEFORE(input=2, t=6) + * <li>watermark=10 -> compacts remaining t=6 records + * </ol> + */ + @Test + void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception { Review Comment: Can we add a test case and play through the case when the pipeline got stopped and resumed? Since watermarks are not checkpointed, it might happen that Long.MIN_VALUE enters the operator again. Even though compaction has happened before. What guarantees can we give in this case? ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java: ########## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WatermarkCompactingSinkMaterializer}. */ +class WatermarkCompactingSinkMaterializerTest { + + private static final int PRIMARY_KEY_INDEX = 1; + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final GeneratedRecordEqualiser RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert first record (watermark is MIN_VALUE) + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); // Buffered, waiting for watermark + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Update with same upsert key (this is the expected pattern for single-source updates) + harness.processElement(updateAfterRecord(1L, 1, "a2")); + assertEmitsNothing(harness); + + // Advance watermark again + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Delete and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and delete before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testDoNothingKeepsFirstRecord() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should keep the first record + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first")); + } + } + + @Test + void testDoErrorThrowsOnConflict() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should throw exception + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @Test + void testDoErrorAllowsSameUpsertKey() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with same upsert key (updates to same source) + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processElement(updateAfterRecord(1L, 1, "v2")); + + // Compact - should not throw, just keep the latest + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testChangelogDisorderHandling(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Simulate changelog disorder from FLIP-558 example: + // Records from different sources (different upsert keys: 1L and 2L) map to same PK (1) + // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1) + // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1) + + // The +U from source 2 arrives first (upsert key = 2L) + harness.processElement(updateAfterRecord(2L, 1, "b1")); + // Then +I and -U from source 1 arrive (upsert key = 1L) + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Net result: only (2L, 1, "b1") remains after cancellation, no conflict + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1")); Review Comment: very nice! ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java: ########## @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.ProjectedRowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; + +/** + * A sink materializer that buffers records and compacts them on watermark progression. + * + * <p>This operator implements the watermark-based compaction algorithm from FLIP-558 for handling + * changelog disorder when the upsert key differs from the sink's primary key. + */ +public class WatermarkCompactingSinkMaterializer extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class); + + private static final String STATE_CLEARED_WARN_MSG = + "The state is cleared because of state ttl. This will result in incorrect result. " + + "You can increase the state ttl to avoid this."; + private static final Set<String> ORDERED_STATE_BACKENDS = Set.of("rocksdb", "forst"); + + private final InsertConflictStrategy conflictStrategy; + private final TypeSerializer<RowData> serializer; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser; + private final int[] inputUpsertKey; + private final boolean hasUpsertKey; + + private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor; + private transient MapState<Long, List<RowData>> buffer; + private transient ValueState<RowData> currentValue; + private transient RecordEqualiser equaliser; + private transient RecordEqualiser upsertKeyEqualiser; + private transient TimestampedCollector<RowData> collector; + private transient boolean isOrderedStateBackend; + + // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey. + private transient ProjectedRowData upsertKeyProjectedRow1; + private transient ProjectedRowData upsertKeyProjectedRow2; + + public WatermarkCompactingSinkMaterializer( + InsertConflictStrategy conflictStrategy, + TypeSerializer<RowData> serializer, + GeneratedRecordEqualiser generatedRecordEqualiser, + @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser, + @Nullable int[] inputUpsertKey) { + validateConflictStrategy(conflictStrategy); + this.conflictStrategy = conflictStrategy; + this.serializer = serializer; + this.generatedRecordEqualiser = generatedRecordEqualiser; + this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser; + this.inputUpsertKey = inputUpsertKey; + this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 0; + } + + private static void validateConflictStrategy(InsertConflictStrategy strategy) { + Preconditions.checkArgument( + strategy.getBehavior() == ConflictBehavior.ERROR + || strategy.getBehavior() == ConflictBehavior.NOTHING, + "Only ERROR and NOTHING strategies are supported, got: %s", + strategy); + } + + @Override + public void open() throws Exception { + super.open(); + initializeEqualisers(); + detectOrderedStateBackend(); + initializeState(); + this.collector = new TimestampedCollector<>(output); + } + + private void initializeEqualisers() { + if (hasUpsertKey) { + this.upsertKeyEqualiser = + generatedUpsertKeyEqualiser.newInstance( + getRuntimeContext().getUserCodeClassLoader()); + upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey); + upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey); + } + this.equaliser = + generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + } + + private void detectOrderedStateBackend() { + KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend(); + String backendType = + keyedStateBackend != null ? keyedStateBackend.getBackendTypeIdentifier() : ""; + this.isOrderedStateBackend = ORDERED_STATE_BACKENDS.contains(backendType); + + if (isOrderedStateBackend) { + LOG.info("Using ordered state backend optimization for {} backend", backendType); + } + } + + private void initializeState() { + this.bufferDescriptor = + new MapStateDescriptor<>( + "watermark-buffer", + LongSerializer.INSTANCE, + new ListSerializer<>(serializer)); + this.buffer = getRuntimeContext().getMapState(bufferDescriptor); + + ValueStateDescriptor<RowData> currentValueDescriptor = + new ValueStateDescriptor<>("current-value", serializer); + this.currentValue = getRuntimeContext().getState(currentValueDescriptor); + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + RowData row = element.getValue(); + long assignedTimestamp = element.getTimestamp(); + bufferRecord(assignedTimestamp, row); Review Comment: we should eagerly compact to reduce the batches during processWatermark ########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java: ########## @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sink; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.table.api.InsertConflictStrategy; +import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior; +import org.apache.flink.table.api.TableRuntimeException; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link WatermarkCompactingSinkMaterializer}. */ +class WatermarkCompactingSinkMaterializerTest { + + private static final int PRIMARY_KEY_INDEX = 1; + + private static final LogicalType[] LOGICAL_TYPES = + new LogicalType[] {new BigIntType(), new IntType(), new VarCharType()}; + + private static final GeneratedRecordEqualiser RECORD_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestRecordEqualiser(); + } + }; + + private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER = + new GeneratedRecordEqualiser("", "", new Object[0]) { + @Override + public RecordEqualiser newInstance(ClassLoader classLoader) { + return new TestUpsertKeyEqualiser(); + } + }; + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert first record (watermark is MIN_VALUE) + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); // Buffered, waiting for watermark + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Update with same upsert key (this is the expected pattern for single-source updates) + harness.processElement(updateAfterRecord(1L, 1, "a2")); + assertEmitsNothing(harness); + + // Advance watermark again + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Delete and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and delete before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testDoNothingKeepsFirstRecord() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should keep the first record + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first")); + } + } + + @Test + void testDoErrorThrowsOnConflict() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with different upsert keys but same primary key + harness.processElement(insertRecord(1L, 1, "first")); + harness.processElement(insertRecord(2L, 1, "second")); + + // Compact - should throw exception + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @Test + void testDoErrorAllowsSameUpsertKey() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two records with same upsert key (updates to same source) + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processElement(updateAfterRecord(1L, 1, "v2")); + + // Compact - should not throw, just keep the latest + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testChangelogDisorderHandling(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Simulate changelog disorder from FLIP-558 example: + // Records from different sources (different upsert keys: 1L and 2L) map to same PK (1) + // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1) + // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1) + + // The +U from source 2 arrives first (upsert key = 2L) + harness.processElement(updateAfterRecord(2L, 1, "b1")); + // Then +I and -U from source 1 arrive (upsert key = 1L) + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Net result: only (2L, 1, "b1") remains after cancellation, no conflict + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "value")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "value")); + + // Insert same value again (same upsert key) + harness.processElement(updateAfterRecord(1L, 1, "value")); + harness.processWatermark(200L); + // Should not emit since value is the same + assertEmitsNothing(harness); + } + } + + /** + * Tests that record timestamps are handled correctly when multiple inputs send records that + * arrive out of timestamp order. This simulates the case where records from different upstream + * tasks have different timestamps and arrive interleaved. + * + * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All records have same primary + * key (1). + * + * <p>Sequence: + * + * <ol> + * <li>INSERT(input=1, t=2) + * <li>watermark=3 -> emits INSERT + * <li>UPDATE_BEFORE(input=1, t=4) + * <li>UPDATE_AFTER(input=1, t=6) + * <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has smaller timestamp + * <li>watermark=5 -> compacts t<=5 records + * <li>UPDATE_BEFORE(input=2, t=6) + * <li>watermark=10 -> compacts remaining t=6 records + * </ol> + */ + @Test + void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarness(ConflictBehavior.NOTHING)) { + harness.open(); + + // INSERT from input 1 with timestamp 2 + harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, "v1", 2L)); + assertEmitsNothing(harness); + + // watermark=3: compacts records with t<=3, emits INSERT(t=2) + harness.processWatermark(3L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v1")); + + // UPDATE_BEFORE from input 1 with timestamp 4 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 1L, 1, "v1", 4L)); + assertEmitsNothing(harness); + + // UPDATE_AFTER from input 1 with timestamp 6 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 1L, 1, "v4", 6L)); + assertEmitsNothing(harness); + + // UPDATE_AFTER from input 2 with timestamp 4 + // This record arrives after the t=6 record but has a smaller timestamp + harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 2L, 1, "v3", 4L)); + + // watermark=5: compacts records with t<=5 + // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4) cancel out for input 1, + // UPDATE_AFTER(2L, t=4) is emitted + harness.processWatermark(5L); + assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "v3")); + + // UPDATE_BEFORE from input 2 with timestamp 6 + harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 2L, 1, "v3", 6L)); + assertEmitsNothing(harness); + + // Final watermark to flush all remaining buffered records (t=6) + // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6) + // After compaction: UPDATE_AFTER(1L, "v4") remains as final value + harness.processWatermark(10L); + assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "v4")); + } + } + + // --- Tests without upsert key --- + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert first record + harness.processElement(insertRecord(1L, 1, "a1")); + assertEmitsNothing(harness); + + // Advance watermark to trigger compaction + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + } + } + + @Test + void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception { + // Without upsert key, UPDATE_AFTER on existing value causes conflict (two rows accumulate) + // NOTHING strategy keeps the first value + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) { + harness.open(); + + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // UPDATE_AFTER with different value - creates conflict, NOTHING keeps first + harness.processElement(updateAfterRecord(2L, 1, "a2")); + harness.processWatermark(200L); + // NOTHING keeps the previous value (1L, 1, "a1"), no emission since unchanged + assertEmitsNothing(harness); + } + } + + @Test + void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception { + // Without upsert key, UPDATE_AFTER on existing value causes conflict (two rows accumulate) + // ERROR strategy throws + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) { + harness.open(); + + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // UPDATE_AFTER with different value - creates conflict, ERROR throws + harness.processElement(updateAfterRecord(2L, 1, "a2")); + assertThatThrownBy(() -> harness.processWatermark(200L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert and compact + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1")); + + // Delete with exact same row and compact + harness.processElement(deleteRecord(1L, 1, "a1")); + harness.processWatermark(200L); + assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1")); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior behavior) + throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert and delete with exact same row before watermark advances + harness.processElement(insertRecord(1L, 1, "a1")); + harness.processElement(deleteRecord(1L, 1, "a1")); + + // Compact - should emit nothing since insert and delete cancel out + harness.processWatermark(100L); + assertEmitsNothing(harness); + } + } + + @Test + void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws Exception { + // Without upsert key, even identical inserts are separate entries + // NOTHING strategy just keeps the first one + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) { + harness.open(); + + // Insert two identical records (same full row) + harness.processElement(insertRecord(1L, 1, "same")); + harness.processElement(insertRecord(1L, 1, "same")); + + // Compact - NOTHING keeps first record + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "same")); + } + } + + @Test + void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception { + // Without upsert key, even identical inserts are separate entries + // ERROR strategy throws because there are multiple records + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) { + harness.open(); + + // Insert two identical records (same full row) + harness.processElement(insertRecord(1L, 1, "same")); + harness.processElement(insertRecord(1L, 1, "same")); + + // Compact - ERROR throws because there are multiple pending records + assertThatThrownBy(() -> harness.processWatermark(100L)) + .isInstanceOf(TableRuntimeException.class) + .hasMessageContaining("Primary key constraint violation"); + } + } + + @ParameterizedTest + @EnumSource( + value = ConflictBehavior.class, + names = {"ERROR", "NOTHING"}) + void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior) throws Exception { + try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> harness = + createHarnessWithoutUpsertKey(behavior)) { + harness.open(); + + // Insert, then update_before + update_after sequence + harness.processElement(insertRecord(1L, 1, "v1")); + harness.processWatermark(100L); + assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v1")); + + // Update: retract old value, insert new value + harness.processElement( + new StreamRecord<>(rowOfKind(RowKind.UPDATE_BEFORE, 1L, 1, "v1"))); Review Comment: nit: same pattern as other test ```suggestion updateBeforeRecord(1L, 1, "v1"))); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
