twalthr commented on code in PR #27502:
URL: https://github.com/apache/flink/pull/27502#discussion_r2758541227


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Delete and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and delete before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testDoNothingKeepsFirstRecord() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should keep the first record
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first"));
+        }
+    }
+
+    @Test
+    void testDoErrorThrowsOnConflict() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should throw exception
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation");
+        }
+    }
+
+    @Test
+    void testDoErrorAllowsSameUpsertKey() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with same upsert key (updates to same source)
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+            // Compact - should not throw, just keep the latest
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder from FLIP-558 example:
+            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
+
+            // The +U from source 2 arrives first (upsert key = 2L)
+            harness.processElement(updateAfterRecord(2L, 1, "b1"));
+            // Then +I and -U from source 1 arrive (upsert key = 1L)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Net result: only (2L, 1, "b1") remains after cancellation, no 
conflict
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "value"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "value"));
+
+            // Insert same value again (same upsert key)
+            harness.processElement(updateAfterRecord(1L, 1, "value"));
+            harness.processWatermark(200L);
+            // Should not emit since value is the same
+            assertEmitsNothing(harness);
+        }
+    }
+
+    /**
+     * Tests that record timestamps are handled correctly when multiple inputs 
send records that
+     * arrive out of timestamp order. This simulates the case where records 
from different upstream
+     * tasks have different timestamps and arrive interleaved.
+     *
+     * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All 
records have same primary
+     * key (1).
+     *
+     * <p>Sequence:
+     *
+     * <ol>
+     *   <li>INSERT(input=1, t=2)
+     *   <li>watermark=3 -> emits INSERT
+     *   <li>UPDATE_BEFORE(input=1, t=4)
+     *   <li>UPDATE_AFTER(input=1, t=6)
+     *   <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has 
smaller timestamp
+     *   <li>watermark=5 -> compacts t<=5 records
+     *   <li>UPDATE_BEFORE(input=2, t=6)
+     *   <li>watermark=10 -> compacts remaining t=6 records
+     * </ol>
+     */
+    @Test
+    void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {

Review Comment:
   Can we add a test case and play through the case when the pipeline got 
stopped and resumed? Since watermarks are not checkpointed, it might happen 
that Long.MIN_VALUE enters the operator again. Even though compaction has 
happened before. What guarantees can we give in this case? 



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Delete and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and delete before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testDoNothingKeepsFirstRecord() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should keep the first record
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first"));
+        }
+    }
+
+    @Test
+    void testDoErrorThrowsOnConflict() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should throw exception
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation");
+        }
+    }
+
+    @Test
+    void testDoErrorAllowsSameUpsertKey() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with same upsert key (updates to same source)
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+            // Compact - should not throw, just keep the latest
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder from FLIP-558 example:
+            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
+
+            // The +U from source 2 arrives first (upsert key = 2L)
+            harness.processElement(updateAfterRecord(2L, 1, "b1"));
+            // Then +I and -U from source 1 arrive (upsert key = 1L)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Net result: only (2L, 1, "b1") remains after cancellation, no 
conflict
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1"));

Review Comment:
   very nice!



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state ttl. This will result in 
incorrect result. "
+                    + "You can increase the state ttl to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+
+    private transient MapStateDescriptor<Long, List<RowData>> bufferDescriptor;
+    private transient MapState<Long, List<RowData>> buffer;
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        detectOrderedStateBackend();
+        initializeState();
+        this.collector = new TimestampedCollector<>(output);
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    private void initializeState() {
+        this.bufferDescriptor =
+                new MapStateDescriptor<>(
+                        "watermark-buffer",
+                        LongSerializer.INSTANCE,
+                        new ListSerializer<>(serializer));
+        this.buffer = getRuntimeContext().getMapState(bufferDescriptor);
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
getRuntimeContext().getState(currentValueDescriptor);
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+        bufferRecord(assignedTimestamp, row);

Review Comment:
   we should eagerly compact to reduce the batches during processWatermark



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializerTest.java:
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link WatermarkCompactingSinkMaterializer}. */
+class WatermarkCompactingSinkMaterializerTest {
+
+    private static final int PRIMARY_KEY_INDEX = 1;
+
+    private static final LogicalType[] LOGICAL_TYPES =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
+
+    private static final GeneratedRecordEqualiser RECORD_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestRecordEqualiser();
+                }
+            };
+
+    private static final GeneratedRecordEqualiser UPSERT_KEY_EQUALISER =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithWatermarkProgression(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert first record (watermark is MIN_VALUE)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness); // Buffered, waiting for watermark
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Update with same upsert key (this is the expected pattern for 
single-source updates)
+            harness.processElement(updateAfterRecord(1L, 1, "a2"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark again
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsert(ConflictBehavior behavior) throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Delete and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindow(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and delete before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testDoNothingKeepsFirstRecord() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should keep the first record
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "first"));
+        }
+    }
+
+    @Test
+    void testDoErrorThrowsOnConflict() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with different upsert keys but same primary 
key
+            harness.processElement(insertRecord(1L, 1, "first"));
+            harness.processElement(insertRecord(2L, 1, "second"));
+
+            // Compact - should throw exception
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation");
+        }
+    }
+
+    @Test
+    void testDoErrorAllowsSameUpsertKey() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two records with same upsert key (updates to same source)
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processElement(updateAfterRecord(1L, 1, "v2"));
+
+            // Compact - should not throw, just keep the latest
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v2"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testChangelogDisorderHandling(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Simulate changelog disorder from FLIP-558 example:
+            // Records from different sources (different upsert keys: 1L and 
2L) map to same PK (1)
+            // Ideal order: +I(1,1,a1), -U(1,1,a1), +U(2,1,b1)
+            // Disordered: +U(2,1,b1), +I(1,1,a1), -U(1,1,a1)
+
+            // The +U from source 2 arrives first (upsert key = 2L)
+            harness.processElement(updateAfterRecord(2L, 1, "b1"));
+            // Then +I and -U from source 1 arrive (upsert key = 1L)
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Net result: only (2L, 1, "b1") remains after cancellation, no 
conflict
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 2L, 1, "b1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testNoEmissionWhenValueUnchanged(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "value"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "value"));
+
+            // Insert same value again (same upsert key)
+            harness.processElement(updateAfterRecord(1L, 1, "value"));
+            harness.processWatermark(200L);
+            // Should not emit since value is the same
+            assertEmitsNothing(harness);
+        }
+    }
+
+    /**
+     * Tests that record timestamps are handled correctly when multiple inputs 
send records that
+     * arrive out of timestamp order. This simulates the case where records 
from different upstream
+     * tasks have different timestamps and arrive interleaved.
+     *
+     * <p>Input 1 uses upsert key = 1L, Input 2 uses upsert key = 2L. All 
records have same primary
+     * key (1).
+     *
+     * <p>Sequence:
+     *
+     * <ol>
+     *   <li>INSERT(input=1, t=2)
+     *   <li>watermark=3 -> emits INSERT
+     *   <li>UPDATE_BEFORE(input=1, t=4)
+     *   <li>UPDATE_AFTER(input=1, t=6)
+     *   <li>UPDATE_AFTER(input=2, t=4) - arrives after t=6 record but has 
smaller timestamp
+     *   <li>watermark=5 -> compacts t<=5 records
+     *   <li>UPDATE_BEFORE(input=2, t=6)
+     *   <li>watermark=10 -> compacts remaining t=6 records
+     * </ol>
+     */
+    @Test
+    void testTwoUpstreamTasksWithDisorderedWatermarks() throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarness(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // INSERT from input 1 with timestamp 2
+            harness.processElement(recordWithTimestamp(RowKind.INSERT, 1L, 1, 
"v1", 2L));
+            assertEmitsNothing(harness);
+
+            // watermark=3: compacts records with t<=3, emits INSERT(t=2)
+            harness.processWatermark(3L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v1"));
+
+            // UPDATE_BEFORE from input 1 with timestamp 4
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
1L, 1, "v1", 4L));
+            assertEmitsNothing(harness);
+
+            // UPDATE_AFTER from input 1 with timestamp 6
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
1L, 1, "v4", 6L));
+            assertEmitsNothing(harness);
+
+            // UPDATE_AFTER from input 2 with timestamp 4
+            // This record arrives after the t=6 record but has a smaller 
timestamp
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_AFTER, 
2L, 1, "v3", 4L));
+
+            // watermark=5: compacts records with t<=5
+            // Buffered: UPDATE_BEFORE(1L, t=4) and UPDATE_AFTER(2L, t=4) 
cancel out for input 1,
+            // UPDATE_AFTER(2L, t=4) is emitted
+            harness.processWatermark(5L);
+            assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "v3"));
+
+            // UPDATE_BEFORE from input 2 with timestamp 6
+            harness.processElement(recordWithTimestamp(RowKind.UPDATE_BEFORE, 
2L, 1, "v3", 6L));
+            assertEmitsNothing(harness);
+
+            // Final watermark to flush all remaining buffered records (t=6)
+            // Buffered: UPDATE_AFTER(1L, t=6) and UPDATE_BEFORE(2L, t=6)
+            // After compaction: UPDATE_AFTER(1L, "v4") remains as final value
+            harness.processWatermark(10L);
+            assertEmits(harness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "v4"));
+        }
+    }
+
+    // --- Tests without upsert key ---
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testBasicInsertWithoutUpsertKey(ConflictBehavior behavior) throws 
Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert first record
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            assertEmitsNothing(harness);
+
+            // Advance watermark to trigger compaction
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+        }
+    }
+
+    @Test
+    void testUpdateWithoutUpsertKeyNothingStrategy() throws Exception {
+        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
+        // NOTHING strategy keeps the first value
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // UPDATE_AFTER with different value - creates conflict, NOTHING 
keeps first
+            harness.processElement(updateAfterRecord(2L, 1, "a2"));
+            harness.processWatermark(200L);
+            // NOTHING keeps the previous value (1L, 1, "a1"), no emission 
since unchanged
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testUpdateWithoutUpsertKeyErrorStrategy() throws Exception {
+        // Without upsert key, UPDATE_AFTER on existing value causes conflict 
(two rows accumulate)
+        // ERROR strategy throws
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // UPDATE_AFTER with different value - creates conflict, ERROR 
throws
+            harness.processElement(updateAfterRecord(2L, 1, "a2"));
+            assertThatThrownBy(() -> harness.processWatermark(200L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation");
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testDeleteAfterInsertWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert and compact
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+            // Delete with exact same row and compact
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+            harness.processWatermark(200L);
+            assertEmits(harness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertAndDeleteInSameWindowWithoutUpsertKey(ConflictBehavior 
behavior)
+            throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert and delete with exact same row before watermark advances
+            harness.processElement(insertRecord(1L, 1, "a1"));
+            harness.processElement(deleteRecord(1L, 1, "a1"));
+
+            // Compact - should emit nothing since insert and delete cancel out
+            harness.processWatermark(100L);
+            assertEmitsNothing(harness);
+        }
+    }
+
+    @Test
+    void testIdenticalInsertsWithoutUpsertKeyNothingKeepsFirst() throws 
Exception {
+        // Without upsert key, even identical inserts are separate entries
+        // NOTHING strategy just keeps the first one
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.NOTHING)) {
+            harness.open();
+
+            // Insert two identical records (same full row)
+            harness.processElement(insertRecord(1L, 1, "same"));
+            harness.processElement(insertRecord(1L, 1, "same"));
+
+            // Compact - NOTHING keeps first record
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "same"));
+        }
+    }
+
+    @Test
+    void testIdenticalInsertsWithoutUpsertKeyErrorThrows() throws Exception {
+        // Without upsert key, even identical inserts are separate entries
+        // ERROR strategy throws because there are multiple records
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(ConflictBehavior.ERROR)) {
+            harness.open();
+
+            // Insert two identical records (same full row)
+            harness.processElement(insertRecord(1L, 1, "same"));
+            harness.processElement(insertRecord(1L, 1, "same"));
+
+            // Compact - ERROR throws because there are multiple pending 
records
+            assertThatThrownBy(() -> harness.processWatermark(100L))
+                    .isInstanceOf(TableRuntimeException.class)
+                    .hasMessageContaining("Primary key constraint violation");
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ConflictBehavior.class,
+            names = {"ERROR", "NOTHING"})
+    void testInsertUpdateDeleteWithoutUpsertKey(ConflictBehavior behavior) 
throws Exception {
+        try (KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
harness =
+                createHarnessWithoutUpsertKey(behavior)) {
+            harness.open();
+
+            // Insert, then update_before + update_after sequence
+            harness.processElement(insertRecord(1L, 1, "v1"));
+            harness.processWatermark(100L);
+            assertEmits(harness, rowOfKind(RowKind.INSERT, 1L, 1, "v1"));
+
+            // Update: retract old value, insert new value
+            harness.processElement(
+                    new StreamRecord<>(rowOfKind(RowKind.UPDATE_BEFORE, 1L, 1, 
"v1")));

Review Comment:
   nit: same pattern as other test
   ```suggestion
                       updateBeforeRecord(1L, 1, "v1")));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to