swapna267 commented on code in PR #15996:
URL: https://github.com/apache/iceberg/pull/15996#discussion_r3143885279


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks so the 
downstream operators observe
+ * them in a consistent order:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to seed the worker's 
key index.
+ *   <li>Equality deletes from the staging snapshot — resolve against the 
index.
+ *   <li>Positional deletes from the staging snapshot — bypass the worker, go 
straight to DVs.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Last staging snapshot whose read commands have been emitted AND committed 
to main.
+  private transient ListState<Long> lastStagingSnapshotState;
+  // Main snapshot observed last cycle (for external-change detection).
+  private transient ListState<Long> lastMainSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+  // Main snapshot that the worker's index is tagged with; sent on every 
ReadCommand.
+  private transient ListState<Long> indexSnapshotState;
+  // Largest phase timestamp emitted so far; the next cycle's first phase 
starts after this.
+  private transient ListState<Long> lastDoneTsState;
+  // Staging snapshot emitted last cycle but not yet confirmed committed to 
main.
+  private transient ListState<Long> pendingStagingSnapshotState;
+
+  private transient Table table;
+
+  private transient Long lastStagingSnapshotId;
+  private transient Long pendingStagingSnapshotId;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long indexSnapshotId;
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+  private transient long lastDoneTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    lastStagingSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastStagingSnapshotId", 
Types.LONG));
+    lastMainSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastMainSnapshotId", 
Types.LONG));
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    lastDoneTsState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastDoneTs", Types.LONG));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+    pendingStagingSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new 
ListStateDescriptor<>("pendingStagingSnapshotId", Types.LONG));
+
+    lastStagingSnapshotId = getValue(lastStagingSnapshotState);
+    pendingStagingSnapshotId = getValue(pendingStagingSnapshotState);
+    lastMainSnapshotId = getValue(lastMainSnapshotState);
+    indexSnapshotId = getValue(indexSnapshotState);
+    Long restoredLastDoneTs = getValue(lastDoneTsState);
+    lastDoneTs = restoredLastDoneTs != null ? restoredLastDoneTs : 0L;
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(lastStagingSnapshotState, lastStagingSnapshotId);
+    storeValue(pendingStagingSnapshotState, pendingStagingSnapshotId);
+    storeValue(lastMainSnapshotState, lastMainSnapshotId);
+    storeValue(indexSnapshotState, indexSnapshotId);
+    storeValue(lastDoneTsState, lastDoneTs);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, lastDoneTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      checkPendingStagingSnapshot(mainSnapshot);
+      syncIndex(mainSnapshot, currentMainSnapshotId);
+      lastMainSnapshotId = currentMainSnapshotId;
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        pendingStagingSnapshotId = stagingSnapshot.snapshotId();
+        emitNoOpResult(triggerTs);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      emitNoOpResult(triggerTs);
+    }
+  }
+
+  private void checkPendingStagingSnapshot(Snapshot mainSnapshot) {
+    if (pendingStagingSnapshotId == null) {
+      return;
+    }
+
+    if (mainSnapshot != null && 
isStagingCommittedOnMain(pendingStagingSnapshotId, mainSnapshot)) {
+      lastStagingSnapshotId = pendingStagingSnapshotId;
+    } else {
+      LOG.warn(
+          "Previous cycle did not commit staging snapshot {} to branch '{}', 
will retry.",
+          pendingStagingSnapshotId,
+          targetBranch);
+    }
+
+    pendingStagingSnapshotId = null;
+  }
+
+  private void syncIndex(Snapshot mainSnapshot, Long currentMainSnapshotId) {
+    boolean needsReindex = handleMainBranchChange(currentMainSnapshotId);
+    if (needsReindex && !mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+      emitReindexFromMain(mainSnapshot);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+  }
+
+  private boolean handleMainBranchChange(Long currentMainSnapshotId) {
+    if (lastMainSnapshotId == null || Objects.equals(lastMainSnapshotId, 
currentMainSnapshotId)) {
+      return false;
+    }
+
+    if (currentMainSnapshotId == null) {
+      mainIndexEmittedSet.clear();
+      indexSnapshotId = null;
+      LOG.info("Main branch '{}' was deleted, clearing index.", targetBranch);
+      return false;
+    }
+
+    for (Snapshot s :
+        SnapshotUtil.ancestorsBetween(table, currentMainSnapshotId, 
lastMainSnapshotId)) {
+      String commitProp =
+          
s.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (commitProp == null || 
!isStagingSnapshotIndexed(Long.parseLong(commitProp))) {
+        LOG.info(
+            "Main branch '{}' advanced {} -> {} with external or 
ahead-of-cursor commits, will reindex.",
+            targetBranch,
+            lastMainSnapshotId,
+            currentMainSnapshotId);
+        return true;
+      }
+    }
+
+    LOG.info(
+        "Main branch '{}' advanced {} -> {}, only own commits detected.",
+        targetBranch,
+        lastMainSnapshotId,
+        currentMainSnapshotId);
+    return false;
+  }
+
+  private boolean isStagingSnapshotIndexed(long propStagingId) {
+    if (lastStagingSnapshotId == null) {
+      return false;
+    }
+
+    if (propStagingId == lastStagingSnapshotId) {
+      return true;
+    }
+
+    if (table.snapshot(lastStagingSnapshotId) == null) {
+      return false;
+    }
+
+    return SnapshotUtil.isAncestorOf(lastStagingSnapshotId, propStagingId, 
table::snapshot);
+  }
+
+  private boolean isStagingCommittedOnMain(long stagingSnapshotId, Snapshot 
mainHead) {
+    Snapshot current = mainHead;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null && Long.parseLong(prop) == stagingSnapshotId) {
+        return true;
+      }
+
+      if (lastMainSnapshotId != null && current.snapshotId() == 
lastMainSnapshotId) {
+        return false;
+      }
+
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    return false;
+  }
+
+  private void emitReadCommandsForSnapshot(Snapshot snapshot, long triggerTs) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      pendingStagingSnapshotId = snapshot.snapshotId();
+      emitNoOpResult(triggerTs);
+      return;
+    }
+
+    // Emit read commands in phases with ascending timestamps and watermarks 
between phases:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files (resolution against the index)
+    //   Phase 2: positional delete files (converted directly to DVPositions, 
bypass Worker)
+    //   Phase 3: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitEqDeletePhase(deletesByFieldIds);
+    emitPosDeletePhase(posDeleteFiles);
+    emitSnapshotDataPhase(newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    pendingStagingSnapshotId = snapshot.snapshotId();
+    lastDoneTs = nextPhaseTs;
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                snapshot.snapshotId(),
+                lastMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataPhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    boolean emittedMainData = false;
+    Snapshot mainSnapshot = table.snapshot(targetBranch);
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      List<Integer> sortedFieldIds = Lists.newArrayList(fieldIds);
+      Collections.sort(sortedFieldIds);
+
+      if (!mainIndexEmittedSet.contains(sortedFieldIds)) {
+        emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+        mainIndexEmittedSet.add(sortedFieldIds);
+        emittedMainData = true;
+      }
+    }
+
+    if (emittedMainData) {
+      advancePhase();
+    }
+  }
+
+  private void emitEqDeletePhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    if (deletesByFieldIds.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      for (DeleteFile deleteFile : entry.getValue()) {
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.eqDeleteFile(
+                    deleteFile.location(), deleteFile.format(), fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+        processedEqDeleteFileNumCounter.inc();
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitPosDeletePhase(List<DeleteFile> posDeleteFiles) {
+    if (posDeleteFiles.isEmpty()) {
+      return;
+    }
+
+    for (DeleteFile deleteFile : posDeleteFiles) {
+      output.collect(
+          new StreamRecord<>(
+              ReadCommand.posDeleteFile(
+                  deleteFile.location(), deleteFile.format(), indexSnapshotId),
+              nextPhaseTs));
+    }
+
+    advancePhase();
+  }
+
+  private void emitSnapshotDataPhase(
+      List<DataFile> snapshotDataFiles, Set<List<Integer>> activeFieldSets) {
+    if (snapshotDataFiles.isEmpty() && activeFieldSets.isEmpty()) {
+      return;
+    }
+
+    for (DataFile dataFile : snapshotDataFiles) {
+      for (List<Integer> fieldIds : activeFieldSets) {
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.dataFile(
+                    dataFile.location(), dataFile.format(), fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+      }
+    }
+
+    advancePhase();
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  private void emitNoOpResult(long triggerTimestamp) {
+    skippedNoOpCyclesCounter.inc();
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                Lists.newArrayList(),
+                Lists.newArrayList(),
+                -1L,
+                lastMainSnapshotId,
+                triggerTimestamp,
+                nextPhaseTs)));
+    output.emitWatermark(new Watermark(nextPhaseTs));
+    lastDoneTs = nextPhaseTs;
+  }
+
+  private void emitMainDataReadCommands(
+      Snapshot mainSnapshot, List<Integer> fieldIds, long phaseTs) {
+    if (mainSnapshot == null) {
+      return;
+    }
+
+    for (ManifestFile manifest : mainSnapshot.dataManifests(table.io())) {
+      try (ManifestReader<DataFile> reader =
+          ManifestFiles.read(manifest, table.io(), table.specs())) {
+        for (DataFile dataFile : reader) {
+          output.collect(
+              new StreamRecord<>(
+                  ReadCommand.dataFile(
+                      dataFile.location(), dataFile.format(), fieldIds, 
indexSnapshotId),
+                  phaseTs));
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to read data manifest for main 
index", e);
+      }
+    }
+
+    LOG.info(
+        "Emitted main data read commands for field IDs {} from snapshot {}.",
+        fieldIds,
+        mainSnapshot.snapshotId());
+  }
+
+  private void emitReindexFromMain(Snapshot mainSnapshot) {

Review Comment:
   As i see, emitReindexFromMain will be invoked when Compaction commit happens 
from within same job. My understanding is correct ? 
   If yes, do we want to handle that differently, as we compact only recent 
data mostly.
   Can we use added and deleted data files info to just update that information 
in the index ?



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks so the 
downstream operators observe
+ * them in a consistent order:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to seed the worker's 
key index.
+ *   <li>Equality deletes from the staging snapshot — resolve against the 
index.
+ *   <li>Positional deletes from the staging snapshot — bypass the worker, go 
straight to DVs.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Last staging snapshot whose read commands have been emitted AND committed 
to main.
+  private transient ListState<Long> lastStagingSnapshotState;
+  // Main snapshot observed last cycle (for external-change detection).
+  private transient ListState<Long> lastMainSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+  // Main snapshot that the worker's index is tagged with; sent on every 
ReadCommand.
+  private transient ListState<Long> indexSnapshotState;
+  // Largest phase timestamp emitted so far; the next cycle's first phase 
starts after this.
+  private transient ListState<Long> lastDoneTsState;
+  // Staging snapshot emitted last cycle but not yet confirmed committed to 
main.
+  private transient ListState<Long> pendingStagingSnapshotState;
+
+  private transient Table table;
+
+  private transient Long lastStagingSnapshotId;
+  private transient Long pendingStagingSnapshotId;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long indexSnapshotId;
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+  private transient long lastDoneTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    lastStagingSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastStagingSnapshotId", 
Types.LONG));
+    lastMainSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastMainSnapshotId", 
Types.LONG));
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    lastDoneTsState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("lastDoneTs", Types.LONG));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+    pendingStagingSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new 
ListStateDescriptor<>("pendingStagingSnapshotId", Types.LONG));
+
+    lastStagingSnapshotId = getValue(lastStagingSnapshotState);
+    pendingStagingSnapshotId = getValue(pendingStagingSnapshotState);
+    lastMainSnapshotId = getValue(lastMainSnapshotState);
+    indexSnapshotId = getValue(indexSnapshotState);
+    Long restoredLastDoneTs = getValue(lastDoneTsState);
+    lastDoneTs = restoredLastDoneTs != null ? restoredLastDoneTs : 0L;
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(lastStagingSnapshotState, lastStagingSnapshotId);
+    storeValue(pendingStagingSnapshotState, pendingStagingSnapshotId);
+    storeValue(lastMainSnapshotState, lastMainSnapshotId);
+    storeValue(indexSnapshotState, indexSnapshotId);
+    storeValue(lastDoneTsState, lastDoneTs);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, lastDoneTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      checkPendingStagingSnapshot(mainSnapshot);
+      syncIndex(mainSnapshot, currentMainSnapshotId);
+      lastMainSnapshotId = currentMainSnapshotId;
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        pendingStagingSnapshotId = stagingSnapshot.snapshotId();
+        emitNoOpResult(triggerTs);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      emitNoOpResult(triggerTs);
+    }
+  }
+
+  private void checkPendingStagingSnapshot(Snapshot mainSnapshot) {
+    if (pendingStagingSnapshotId == null) {
+      return;
+    }
+
+    if (mainSnapshot != null && 
isStagingCommittedOnMain(pendingStagingSnapshotId, mainSnapshot)) {
+      lastStagingSnapshotId = pendingStagingSnapshotId;
+    } else {
+      LOG.warn(
+          "Previous cycle did not commit staging snapshot {} to branch '{}', 
will retry.",
+          pendingStagingSnapshotId,
+          targetBranch);
+    }
+
+    pendingStagingSnapshotId = null;
+  }
+
+  private void syncIndex(Snapshot mainSnapshot, Long currentMainSnapshotId) {
+    boolean needsReindex = handleMainBranchChange(currentMainSnapshotId);
+    if (needsReindex && !mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+      emitReindexFromMain(mainSnapshot);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+  }
+
+  private boolean handleMainBranchChange(Long currentMainSnapshotId) {
+    if (lastMainSnapshotId == null || Objects.equals(lastMainSnapshotId, 
currentMainSnapshotId)) {
+      return false;
+    }
+
+    if (currentMainSnapshotId == null) {
+      mainIndexEmittedSet.clear();
+      indexSnapshotId = null;
+      LOG.info("Main branch '{}' was deleted, clearing index.", targetBranch);
+      return false;
+    }
+
+    for (Snapshot s :
+        SnapshotUtil.ancestorsBetween(table, currentMainSnapshotId, 
lastMainSnapshotId)) {
+      String commitProp =
+          
s.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (commitProp == null || 
!isStagingSnapshotIndexed(Long.parseLong(commitProp))) {
+        LOG.info(
+            "Main branch '{}' advanced {} -> {} with external or 
ahead-of-cursor commits, will reindex.",
+            targetBranch,
+            lastMainSnapshotId,
+            currentMainSnapshotId);
+        return true;
+      }
+    }
+
+    LOG.info(
+        "Main branch '{}' advanced {} -> {}, only own commits detected.",
+        targetBranch,
+        lastMainSnapshotId,
+        currentMainSnapshotId);
+    return false;
+  }
+
+  private boolean isStagingSnapshotIndexed(long propStagingId) {
+    if (lastStagingSnapshotId == null) {
+      return false;
+    }
+
+    if (propStagingId == lastStagingSnapshotId) {
+      return true;
+    }
+
+    if (table.snapshot(lastStagingSnapshotId) == null) {
+      return false;
+    }
+
+    return SnapshotUtil.isAncestorOf(lastStagingSnapshotId, propStagingId, 
table::snapshot);
+  }
+
+  private boolean isStagingCommittedOnMain(long stagingSnapshotId, Snapshot 
mainHead) {
+    Snapshot current = mainHead;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null && Long.parseLong(prop) == stagingSnapshotId) {
+        return true;
+      }
+
+      if (lastMainSnapshotId != null && current.snapshotId() == 
lastMainSnapshotId) {
+        return false;
+      }
+
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    return false;
+  }
+
+  private void emitReadCommandsForSnapshot(Snapshot snapshot, long triggerTs) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      pendingStagingSnapshotId = snapshot.snapshotId();
+      emitNoOpResult(triggerTs);
+      return;
+    }
+
+    // Emit read commands in phases with ascending timestamps and watermarks 
between phases:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files (resolution against the index)
+    //   Phase 2: positional delete files (converted directly to DVPositions, 
bypass Worker)
+    //   Phase 3: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitEqDeletePhase(deletesByFieldIds);
+    emitPosDeletePhase(posDeleteFiles);
+    emitSnapshotDataPhase(newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    pendingStagingSnapshotId = snapshot.snapshotId();
+    lastDoneTs = nextPhaseTs;
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                snapshot.snapshotId(),
+                lastMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataPhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    boolean emittedMainData = false;
+    Snapshot mainSnapshot = table.snapshot(targetBranch);
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      List<Integer> sortedFieldIds = Lists.newArrayList(fieldIds);
+      Collections.sort(sortedFieldIds);
+
+      if (!mainIndexEmittedSet.contains(sortedFieldIds)) {
+        emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+        mainIndexEmittedSet.add(sortedFieldIds);
+        emittedMainData = true;
+      }
+    }
+
+    if (emittedMainData) {
+      advancePhase();
+    }
+  }
+
+  private void emitEqDeletePhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    if (deletesByFieldIds.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      for (DeleteFile deleteFile : entry.getValue()) {
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.eqDeleteFile(
+                    deleteFile.location(), deleteFile.format(), fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+        processedEqDeleteFileNumCounter.inc();
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitPosDeletePhase(List<DeleteFile> posDeleteFiles) {
+    if (posDeleteFiles.isEmpty()) {
+      return;
+    }
+
+    for (DeleteFile deleteFile : posDeleteFiles) {
+      output.collect(
+          new StreamRecord<>(
+              ReadCommand.posDeleteFile(
+                  deleteFile.location(), deleteFile.format(), indexSnapshotId),
+              nextPhaseTs));
+    }
+
+    advancePhase();
+  }
+
+  private void emitSnapshotDataPhase(
+      List<DataFile> snapshotDataFiles, Set<List<Integer>> activeFieldSets) {
+    if (snapshotDataFiles.isEmpty() && activeFieldSets.isEmpty()) {
+      return;
+    }
+
+    for (DataFile dataFile : snapshotDataFiles) {
+      for (List<Integer> fieldIds : activeFieldSets) {
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.dataFile(
+                    dataFile.location(), dataFile.format(), fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+      }
+    }
+
+    advancePhase();
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  private void emitNoOpResult(long triggerTimestamp) {
+    skippedNoOpCyclesCounter.inc();
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                Lists.newArrayList(),
+                Lists.newArrayList(),
+                -1L,
+                lastMainSnapshotId,
+                triggerTimestamp,
+                nextPhaseTs)));
+    output.emitWatermark(new Watermark(nextPhaseTs));
+    lastDoneTs = nextPhaseTs;
+  }
+
+  private void emitMainDataReadCommands(
+      Snapshot mainSnapshot, List<Integer> fieldIds, long phaseTs) {
+    if (mainSnapshot == null) {
+      return;
+    }
+
+    for (ManifestFile manifest : mainSnapshot.dataManifests(table.io())) {
+      try (ManifestReader<DataFile> reader =
+          ManifestFiles.read(manifest, table.io(), table.specs())) {
+        for (DataFile dataFile : reader) {
+          output.collect(
+              new StreamRecord<>(
+                  ReadCommand.dataFile(
+                      dataFile.location(), dataFile.format(), fieldIds, 
indexSnapshotId),
+                  phaseTs));
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to read data manifest for main 
index", e);
+      }
+    }
+
+    LOG.info(
+        "Emitted main data read commands for field IDs {} from snapshot {}.",
+        fieldIds,
+        mainSnapshot.snapshotId());
+  }
+
+  private void emitReindexFromMain(Snapshot mainSnapshot) {

Review Comment:
   As i see, emitReindexFromMain will be invoked when Compaction commit happens 
from within same job. Is that correct ? 
   If yes, do we want to handle that differently, as we compact only recent 
data mostly.
   Can we use added and deleted data files info to just update that information 
in the index ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to