pnowojski commented on code in PR #27602:
URL: https://github.com/apache/flink/pull/27602#discussion_r2821508827


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -115,6 +117,14 @@ public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<Row
     // Tracks the current watermark. Used to detect in-flight records after 
restore.
     private transient long currentWatermark = Long.MIN_VALUE;
 
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;

Review Comment:
   nit: mark `@Nullable`



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);

Review Comment:
   Are you sure there is no off by 1ms error? We are assigning 
`currentWatermark` as the timestamp and registering timer for that same value. 
Doesn't `Watermark(x)` fires timers until `x` inclusive?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();

Review Comment:
   nit: maybe add a comment that this depends on the 
`WatermarkTimestampAssigner` to assign watermark as the timestamp?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();

Review Comment:
   Do we have to consolidate it during `processElement`? Isn't it enough to do 
it when timer fires? 🤔 If yes, that would save us some state access on the hot 
path.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        throw new UnsupportedOperationException("Processing-time timers are 
not supported");
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }

Review Comment:
   Have we considered `ON CONFLICT KEEP LATEST` option?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        throw new UnsupportedOperationException("Processing-time timers are 
not supported");
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }
+        } else if (pendingRecords.isEmpty()) {
+            if (previousValue != null) {
+                emit(previousValue, DELETE);
+                currentValue.clear();
+            }
+        } else {
+            final RowData newValue = pendingRecords.get(0);
+            if (previousValue == null) {
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            } else if (!recordEquals(previousValue, newValue)) {
+                emit(newValue, UPDATE_AFTER);
+                currentValue.update(newValue);
+            }
+        }
+    }
+
+    private List<RowData> collectPendingRecords(RowData previousValue, long 
newWatermark)

Review Comment:
   nit: `@Nullable` for `previousValue`



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        throw new UnsupportedOperationException("Processing-time timers are 
not supported");
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }
+        } else if (pendingRecords.isEmpty()) {
+            if (previousValue != null) {
+                emit(previousValue, DELETE);
+                currentValue.clear();
+            }

Review Comment:
   Isn't this a bit risky? Can you reliably distinguish empty `pendingRecords` 
meaning there was a deletion vs nothing has changed? 🤔 I mean, if we fire an 
empty timer, without processing any records for given key, that would with this 
code result in deleting the previously emitted value. 



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        throw new UnsupportedOperationException("Processing-time timers are 
not supported");
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }
+        } else if (pendingRecords.isEmpty()) {
+            if (previousValue != null) {
+                emit(previousValue, DELETE);
+                currentValue.clear();
+            }
+        } else {
+            final RowData newValue = pendingRecords.get(0);
+            if (previousValue == null) {
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            } else if (!recordEquals(previousValue, newValue)) {
+                emit(newValue, UPDATE_AFTER);
+                currentValue.update(newValue);
+            }
+        }
+    }
+
+    private List<RowData> collectPendingRecords(RowData previousValue, long 
newWatermark)

Review Comment:
   nit: add java doc explaining what does it do? Especially that name is a bit 
missleading, by not mentioning the state is being cleaned up here. So maybe 
also rename the method and parameter to `collectAndCleanPendingRecords(RowData 
previousValue, long upToWatermark)`?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }

Review Comment:
   Can you make sure this operator is interruptible? That means doing two 
things:
   - checking if it's fine that the procedure of firing timers during 
`processWatermark` call can be interrupted. That is, with non-interruptible 
operators, `operator.processWatermark(x)` call is atomic with the firing all 
timers for `x` via `onEventTime(x)`. With interruptible operators, 
`processWatermark` can be invoked, then some timers might (or may not) be 
fired, firing then is interrupted and before it's continued: checkpoints can 
happen, some other mailbox actions can be triggered including upstream chained 
operators, so you should epxect that more `processElement` can be invoked while 
we still haven't yet fully processed watermark. There is no known case where 
this can break that I know of, but there is one yellow flag here. Your 
`processWatermark` persists `this.currentWatermark = mark.getTimestamp();`. The 
issue is that this `currentWatermark` might not be fully processed after your 
`processWatermark` returns with interruptile timers. Ie, at the end of your 
`processWatermar
 k`, watermark might not be fully processed, there might be some timers that 
still need to fire and downstream operators might not yet have received that 
same watermark.
   - if the above is fine, probably you should just return `true` from 
`org.apache.flink.streaming.api.operators.AbstractStreamOperator#useInterruptibleTimers`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/WatermarkCompactingSinkMaterializer.java:
##########
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.InsertConflictStrategy;
+import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior;
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * A sink materializer that buffers records and compacts them on watermark 
progression.
+ *
+ * <p>This operator implements the watermark-based compaction algorithm from 
FLIP-558 for handling
+ * changelog disorder when the upsert key differs from the sink's primary key.
+ */
+public class WatermarkCompactingSinkMaterializer extends 
TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData>, 
Triggerable<RowData, VoidNamespace> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(WatermarkCompactingSinkMaterializer.class);
+
+    private static final String STATE_CLEARED_WARN_MSG =
+            "The state is cleared because of state TTL. This will lead to 
incorrect results. "
+                    + "You can increase the state TTL to avoid this.";
+    private static final Set<String> ORDERED_STATE_BACKENDS = 
Set.of("rocksdb", "forst");
+
+    private final InsertConflictStrategy conflictStrategy;
+    private final TypeSerializer<RowData> serializer;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
+    private final RowType keyType;
+    private final String[] primaryKeyNames;
+
+    // Buffers incoming changelog records (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) keyed by
+    // their timestamp. Watermarks act as compaction barriers: when a 
watermark arrives, we know
+    // that UPDATE_BEFORE and its corresponding UPDATE_AFTER have both been 
received and can be
+    // compacted together. This solves the out-of-order problem where a later 
UPDATE_AFTER may
+    // arrive before the UPDATE_BEFORE of a previous change.
+    private transient MapState<Long, List<RowData>> buffer;
+
+    // Stores the last emitted value for the current primary key. Used to 
detect duplicates
+    // and determine the correct RowKind (INSERT vs UPDATE_AFTER) on 
subsequent compactions.
+    private transient ValueState<RowData> currentValue;
+    private transient RecordEqualiser equaliser;
+    private transient RecordEqualiser upsertKeyEqualiser;
+    private transient TimestampedCollector<RowData> collector;
+    private transient boolean isOrderedStateBackend;
+
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
+    // Field getters for formatting the primary key in error messages.
+    private transient RowData.FieldGetter[] keyFieldGetters;
+
+    // Tracks the current watermark. Used to detect in-flight records after 
restore.
+    private transient long currentWatermark = Long.MIN_VALUE;
+
+    private transient InternalTimerService<VoidNamespace> timerService;
+
+    // Tracks the checkpoint ID this operator was restored from (empty if 
fresh start)
+    private transient Long restoredCheckpointId;
+
+    // Per-key state tracking the checkpoint ID for which consolidation was 
done
+    private transient ValueState<Long> consolidatedCheckpointId;
+
+    public WatermarkCompactingSinkMaterializer(
+            InsertConflictStrategy conflictStrategy,
+            TypeSerializer<RowData> serializer,
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey,
+            RowType keyType,
+            String[] primaryKeyNames) {
+        validateConflictStrategy(conflictStrategy);
+        this.conflictStrategy = conflictStrategy;
+        this.serializer = serializer;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 
0;
+        this.keyType = keyType;
+        this.primaryKeyNames = primaryKeyNames;
+    }
+
+    private static void validateConflictStrategy(InsertConflictStrategy 
strategy) {
+        Preconditions.checkArgument(
+                strategy.getBehavior() == ConflictBehavior.ERROR
+                        || strategy.getBehavior() == ConflictBehavior.NOTHING,
+                "Only ERROR and NOTHING strategies are supported, got: %s",
+                strategy);
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        // Initialize state descriptors and handles
+        this.buffer =
+                context.getKeyedStateStore()
+                        .getMapState(
+                                new MapStateDescriptor<>(
+                                        "watermark-buffer",
+                                        SortedLongSerializer.INSTANCE,
+                                        new ListSerializer<>(serializer)));
+
+        ValueStateDescriptor<RowData> currentValueDescriptor =
+                new ValueStateDescriptor<>("current-value", serializer);
+        this.currentValue = 
context.getKeyedStateStore().getState(currentValueDescriptor);
+
+        // Descriptor for per-key consolidation tracking
+        ValueStateDescriptor<Long> consolidatedCheckpointIdDescriptor =
+                new ValueStateDescriptor<>("consolidated-checkpoint-id", 
Long.class);
+        this.consolidatedCheckpointId =
+                
context.getKeyedStateStore().getState(consolidatedCheckpointIdDescriptor);
+
+        this.restoredCheckpointId =
+                context.getRestoredCheckpointId().isEmpty()
+                        ? null
+                        : context.getRestoredCheckpointId().getAsLong();
+    }
+
+    private void consolidateBufferToMinValue() throws Exception {
+        List<RowData> consolidated = new ArrayList<>();
+
+        if (isOrderedStateBackend) {
+            // RocksDB/ForSt: entries are already sorted by timestamp
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                consolidated.addAll(iterator.next().getValue());
+                iterator.remove();
+            }
+        } else {
+            // Other backends: collect, sort by timestamp, then consolidate
+            List<Map.Entry<Long, List<RowData>>> entries = new ArrayList<>();
+            Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+            while (iterator.hasNext()) {
+                entries.add(iterator.next());
+                iterator.remove();
+            }
+
+            entries.sort(Map.Entry.comparingByKey());
+
+            for (Map.Entry<Long, List<RowData>> entry : entries) {
+                consolidated.addAll(entry.getValue());
+            }
+        }
+
+        if (!consolidated.isEmpty()) {
+            buffer.put(Long.MIN_VALUE, consolidated);
+        }
+    }
+
+    private void maybeConsolidateAfterRestore() throws Exception {
+        if (restoredCheckpointId == null) {
+            return; // Fresh start, no restore
+        }
+
+        Long storedId = this.consolidatedCheckpointId.value();
+
+        if (storedId != null && storedId.equals(restoredCheckpointId)) {
+            return; // Already consolidated for this restore
+        }
+
+        consolidateBufferToMinValue();
+        this.consolidatedCheckpointId.update(restoredCheckpointId);
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 
Long.MIN_VALUE);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        initializeEqualisers();
+        initializeKeyFieldGetters();
+        detectOrderedStateBackend();
+        this.collector = new TimestampedCollector<>(output);
+
+        this.timerService =
+                getInternalTimerService(
+                        "compaction-timers", VoidNamespaceSerializer.INSTANCE, 
this);
+    }
+
+    private void initializeKeyFieldGetters() {
+        this.keyFieldGetters = new RowData.FieldGetter[primaryKeyNames.length];
+        for (int i = 0; i < primaryKeyNames.length; i++) {
+            LogicalType fieldType = keyType.getTypeAt(i);
+            keyFieldGetters[i] = RowData.createFieldGetter(fieldType, i);
+        }
+    }
+
+    private void initializeEqualisers() {
+        if (hasUpsertKey) {
+            this.upsertKeyEqualiser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        }
+        this.equaliser =
+                
generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+    }
+
+    private void detectOrderedStateBackend() {
+        KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
+        String backendType =
+                keyedStateBackend != null ? 
keyedStateBackend.getBackendTypeIdentifier() : "";
+        this.isOrderedStateBackend = 
ORDERED_STATE_BACKENDS.contains(backendType);
+
+        if (isOrderedStateBackend) {
+            LOG.info("Using ordered state backend optimization for {} 
backend", backendType);
+        }
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        RowData row = element.getValue();
+        long assignedTimestamp = element.getTimestamp();
+
+        maybeConsolidateAfterRestore();
+
+        // If we haven't received any watermark yet (still at MIN_VALUE after 
restore)
+        // and the timestamp is beyond MIN_VALUE, it's from in-flight data 
that was
+        // checkpointed before restore. Assign to MIN_VALUE.
+        if (currentWatermark == Long.MIN_VALUE && assignedTimestamp > 
Long.MIN_VALUE) {
+            assignedTimestamp = Long.MIN_VALUE;
+        }
+
+        bufferRecord(assignedTimestamp, row);
+    }
+
+    private void bufferRecord(long timestamp, RowData row) throws Exception {
+        List<RowData> records = buffer.get(timestamp);
+        if (records == null) {
+            records = new ArrayList<>();
+        }
+        switch (row.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                // Try to cancel out a pending retraction; if none, just append
+                if (!tryCancelRetraction(records, row)) {
+                    records.add(row);
+                }
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // Try to cancel out an existing addition; if none, keep for 
cross-bucket
+                if (!tryCancelAddition(records, row)) {
+                    records.add(row);
+                }
+                break;
+        }
+        buffer.put(timestamp, records);
+
+        timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, timestamp);
+    }
+
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        this.currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        maybeConsolidateAfterRestore();
+        compactAndEmit(currentWatermark);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) 
throws Exception {
+        throw new UnsupportedOperationException("Processing-time timers are 
not supported");
+    }
+
+    private void compactAndEmit(long newWatermark) throws Exception {
+        RowData previousValue = currentValue.value();
+        List<RowData> pendingRecords = collectPendingRecords(previousValue, 
newWatermark);
+
+        if (pendingRecords.size() > 1) {
+            if (conflictStrategy.getBehavior() == ConflictBehavior.ERROR) {
+                RowData key = (RowData) getKeyedStateBackend().getCurrentKey();
+                throw new TableRuntimeException(
+                        "Primary key constraint violation: multiple distinct 
records with "
+                                + "the same primary key detected. Conflicting 
key: ["
+                                + formatKey(key)
+                                + "]. Use ON CONFLICT DO NOTHING to keep the 
first record.");
+            } else if (previousValue == null) {
+                final RowData newValue = pendingRecords.get(0);
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            }
+        } else if (pendingRecords.isEmpty()) {
+            if (previousValue != null) {
+                emit(previousValue, DELETE);
+                currentValue.clear();
+            }
+        } else {
+            final RowData newValue = pendingRecords.get(0);
+            if (previousValue == null) {
+                emit(newValue, INSERT);
+                currentValue.update(newValue);
+            } else if (!recordEquals(previousValue, newValue)) {
+                emit(newValue, UPDATE_AFTER);
+                currentValue.update(newValue);
+            }
+        }
+    }
+
+    private List<RowData> collectPendingRecords(RowData previousValue, long 
newWatermark)
+            throws Exception {
+        List<RowData> records = new ArrayList<>();
+        if (previousValue != null) {
+            records.add(previousValue);
+        }
+        Iterator<Map.Entry<Long, List<RowData>>> iterator = 
buffer.entries().iterator();
+
+        while (iterator.hasNext()) {
+            Map.Entry<Long, List<RowData>> entry = iterator.next();
+            if (entry.getKey() <= newWatermark) {
+                for (RowData pendingRecord : entry.getValue()) {
+                    switch (pendingRecord.getRowKind()) {
+                        case INSERT:
+                        case UPDATE_AFTER:
+                            addRow(records, pendingRecord);
+                            break;
+
+                        case UPDATE_BEFORE:
+                        case DELETE:
+                            retractRow(records, pendingRecord);
+                            break;
+                    }
+                }
+                iterator.remove();
+            } else if (isOrderedStateBackend) {
+                break;
+            }
+        }
+        return records;
+    }
+
+    private void addRow(List<RowData> values, RowData add) {
+        if (hasUpsertKey) {
+            int index = findFirst(values, add);
+            if (index == -1) {
+                values.add(add);
+            } else {
+                values.set(index, add);
+            }
+        } else {
+            values.add(add);
+        }
+    }
+
+    private void retractRow(List<RowData> values, RowData retract) {
+        final int index = findFirst(values, retract);
+        if (index == -1) {
+            LOG.info(STATE_CLEARED_WARN_MSG);
+        } else {
+            // Remove first found row
+            values.remove(index);
+        }
+    }

Review Comment:
   Shouldn't this handle the upsert key as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to