mxm commented on code in PR #15996:
URL: https://github.com/apache/iceberg/pull/15996#discussion_r3186127247


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks so the 
downstream operators observe
+ * them in a consistent order:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to seed the worker's 
key index.
+ *   <li>Equality deletes from the staging snapshot — resolve against the 
index.
+ *   <li>Positional deletes from the staging snapshot — bypass the worker, go 
straight to DVs.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = getValue(indexSnapshotState);
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(indexSnapshotState, indexSnapshotId);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      updateState(mainSnapshot);
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs, 
currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  private void updateState(Snapshot mainHead) {
+    Long currentMainSnapshotId = mainHead != null ? mainHead.snapshotId() : 
null;
+
+    int externalCount = 0;
+    Snapshot current = mainHead;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        // First property found going back from main head is our most recent 
commit.
+        lastStagingSnapshotId = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    boolean mainChanged =
+        lastMainSnapshotId != null && 
!lastMainSnapshotId.equals(currentMainSnapshotId);
+    if (mainChanged && externalCount > 0 && !mainIndexEmittedSet.isEmpty()) {
+      LOG.info(
+          "Detected {} external commit(s) on branch '{}' since previous cycle, 
reindexing.",
+          externalCount,
+          targetBranch);
+      indexSnapshotId = currentMainSnapshotId;
+      // Full reindex: re-emits all data files from the current main snapshot 
for every equality
+      // field set that was previously indexed. This is triggered when an 
external commit (e.g.
+      // compaction) changes the main branch and the existing index may 
reference stale file paths.
+      emitReindexFromMain(mainHead);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  private void emitReadCommandsForSnapshot(
+      Snapshot snapshot, long triggerTs, Long currentMainSnapshotId) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+      return;
+    }
+
+    // Emit read commands in phases with ascending timestamps and watermarks 
between phases:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files (resolution against the index)
+    //   Phase 2: positional delete files (converted directly to DVPositions, 
bypass Worker)
+    //   Phase 3: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitEqDeletePhase(deletesByFieldIds);
+    emitPosDeletePhase(posDeleteFiles);
+    emitSnapshotDataPhase(snapshot, newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                snapshot.snapshotId(),
+                currentMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataPhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    Snapshot mainSnapshot = table.snapshot(targetBranch);
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      List<Integer> sortedFieldIds = Lists.newArrayList(fieldIds);
+      Collections.sort(sortedFieldIds);
+
+      if (!mainIndexEmittedSet.contains(sortedFieldIds)) {
+        emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+        mainIndexEmittedSet.add(sortedFieldIds);
+      }
+    }
+
+    advancePhase();

Review Comment:
   Watermarks are used to separate the phases from each other in the worker 
which receives shuffled data.
   
   index main -> resolve eq deletes -> merge pos deletes -> index staging data
   
   Without this, we cannot guarantee that all data has been indexed before we 
resolve equality deletes. We can probably simplify a little bit because we 
don't need watermarks for merging positional deletes. I've updated the PR and 
added some comments.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks so the 
downstream operators observe
+ * them in a consistent order:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to seed the worker's 
key index.
+ *   <li>Equality deletes from the staging snapshot — resolve against the 
index.
+ *   <li>Positional deletes from the staging snapshot — bypass the worker, go 
straight to DVs.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = getValue(indexSnapshotState);
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(indexSnapshotState, indexSnapshotId);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      updateState(mainSnapshot);
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs, 
currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  private void updateState(Snapshot mainHead) {
+    Long currentMainSnapshotId = mainHead != null ? mainHead.snapshotId() : 
null;
+
+    int externalCount = 0;
+    Snapshot current = mainHead;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        // First property found going back from main head is our most recent 
commit.
+        lastStagingSnapshotId = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    boolean mainChanged =
+        lastMainSnapshotId != null && 
!lastMainSnapshotId.equals(currentMainSnapshotId);
+    if (mainChanged && externalCount > 0 && !mainIndexEmittedSet.isEmpty()) {
+      LOG.info(
+          "Detected {} external commit(s) on branch '{}' since previous cycle, 
reindexing.",
+          externalCount,
+          targetBranch);
+      indexSnapshotId = currentMainSnapshotId;
+      // Full reindex: re-emits all data files from the current main snapshot 
for every equality
+      // field set that was previously indexed. This is triggered when an 
external commit (e.g.
+      // compaction) changes the main branch and the existing index may 
reference stale file paths.
+      emitReindexFromMain(mainHead);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  private void emitReadCommandsForSnapshot(
+      Snapshot snapshot, long triggerTs, Long currentMainSnapshotId) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+
+    for (DataFile dataFile : changes.addedDataFiles()) {
+      newDataFiles.add(dataFile.copy());
+    }
+
+    for (DeleteFile deleteFile : changes.addedDeleteFiles()) {
+      if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+        deletesByFieldIds
+            .computeIfAbsent(deleteFile.equalityFieldIds(), k -> 
Lists.newArrayList())
+            .add(deleteFile.copy());
+      } else if (ContentFileUtil.isDV(deleteFile)) {
+        stagingDVFiles.add(deleteFile.copy());
+      } else {
+        posDeleteFiles.add(deleteFile.copy());
+      }
+    }
+
+    boolean hasEqDeletes = !deletesByFieldIds.isEmpty();
+
+    if (newDataFiles.isEmpty()
+        && !hasEqDeletes
+        && stagingDVFiles.isEmpty()
+        && posDeleteFiles.isEmpty()) {
+      LOG.info("No new files on staging branch '{}' to convert.", 
stagingBranch);
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+      return;
+    }
+
+    // Emit read commands in phases with ascending timestamps and watermarks 
between phases:
+    //   Phase 0: main data files (index refresh)
+    //   Phase 1: equality delete files (resolution against the index)
+    //   Phase 2: positional delete files (converted directly to DVPositions, 
bypass Worker)
+    //   Phase 3: staging data files (index update for next cycle)
+    emitMainDataPhase(deletesByFieldIds);
+    emitEqDeletePhase(deletesByFieldIds);
+    emitPosDeletePhase(posDeleteFiles);
+    emitSnapshotDataPhase(snapshot, newDataFiles, mainIndexEmittedSet);
+
+    LOG.info(
+        "Emitted read commands for {} new data files from staging branch 
'{}'.",
+        newDataFiles.size(),
+        stagingBranch);
+
+    processedStagingSnapshotNumCounter.inc();
+
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                newDataFiles,
+                stagingDVFiles,
+                snapshot.snapshotId(),
+                currentMainSnapshotId,
+                triggerTs,
+                nextPhaseTs)));
+
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataPhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    Snapshot mainSnapshot = table.snapshot(targetBranch);
+
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      List<Integer> sortedFieldIds = Lists.newArrayList(fieldIds);
+      Collections.sort(sortedFieldIds);
+
+      if (!mainIndexEmittedSet.contains(sortedFieldIds)) {
+        emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+        mainIndexEmittedSet.add(sortedFieldIds);
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitEqDeletePhase(Map<List<Integer>, List<DeleteFile>> 
deletesByFieldIds) {
+    for (Map.Entry<List<Integer>, List<DeleteFile>> entry : 
deletesByFieldIds.entrySet()) {
+      List<Integer> fieldIds = entry.getKey();
+      for (DeleteFile deleteFile : entry.getValue()) {
+        PartitionSpec spec = table.specs().get(deleteFile.specId());
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.eqDeleteFile(deleteFile, spec, fieldIds, 
indexSnapshotId),
+                nextPhaseTs));
+        processedEqDeleteFileNumCounter.inc();
+      }
+    }
+
+    advancePhase();
+  }
+
+  private void emitPosDeletePhase(List<DeleteFile> posDeleteFiles) {
+    for (DeleteFile deleteFile : posDeleteFiles) {
+      PartitionSpec spec = table.specs().get(deleteFile.specId());
+      output.collect(
+          new StreamRecord<>(
+              ReadCommand.posDeleteFile(deleteFile, spec, indexSnapshotId), 
nextPhaseTs));
+    }
+
+    advancePhase();
+  }
+
+  private void emitSnapshotDataPhase(
+      Snapshot stagingSnapshot,
+      List<DataFile> snapshotDataFiles,
+      Set<List<Integer>> activeFieldSets) {
+    long commitSnapshotId = stagingSnapshot.snapshotId();
+    for (DataFile dataFile : snapshotDataFiles) {
+      PartitionSpec spec = table.specs().get(dataFile.specId());
+      for (List<Integer> fieldIds : activeFieldSets) {
+        output.collect(
+            new StreamRecord<>(
+                ReadCommand.dataFile(dataFile, spec, commitSnapshotId, 
fieldIds, indexSnapshotId),
+                nextPhaseTs));
+      }
+    }
+
+    advancePhase();
+  }
+
+  @Override
+  public void close() throws Exception {
+    super.close();
+    tableLoader.close();
+  }
+
+  private void emitNoOpResult(long triggerTimestamp, Long 
currentMainSnapshotId) {
+    skippedNoOpCyclesCounter.inc();
+    output.collect(
+        METADATA_STREAM,
+        new StreamRecord<>(
+            new EqualityConvertPlanResult(
+                Lists.newArrayList(),
+                Lists.newArrayList(),
+                -1L,
+                currentMainSnapshotId,
+                triggerTimestamp,
+                nextPhaseTs)));
+    output.emitWatermark(new Watermark(nextPhaseTs));
+  }
+
+  private void emitMainDataReadCommands(
+      Snapshot mainSnapshot, List<Integer> fieldIds, long phaseTs) {
+    if (mainSnapshot == null) {
+      return;
+    }
+
+    long commitSnapshotId = mainSnapshot.snapshotId();
+    for (ManifestFile manifest : mainSnapshot.dataManifests(table.io())) {
+      try (ManifestReader<DataFile> reader =
+          ManifestFiles.read(manifest, table.io(), table.specs())) {
+        for (DataFile dataFile : reader) {
+          PartitionSpec spec = table.specs().get(dataFile.specId());
+          output.collect(
+              new StreamRecord<>(
+                  ReadCommand.dataFile(dataFile, spec, commitSnapshotId, 
fieldIds, indexSnapshotId),
+                  phaseTs));
+        }
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to read data manifest for main 
index", e);
+      }
+    }
+
+    LOG.info(
+        "Emitted main data read commands for field IDs {} from snapshot {}.",
+        fieldIds,
+        mainSnapshot.snapshotId());
+  }
+
+  private void emitReindexFromMain(Snapshot mainSnapshot) {
+    for (List<Integer> fieldIds : mainIndexEmittedSet) {
+      emitMainDataReadCommands(mainSnapshot, fieldIds, nextPhaseTs);
+    }
+
+    advancePhase();
+    LOG.info(
+        "Immediate reindex emitted for {} field sets from snapshot {}.",
+        mainIndexEmittedSet.size(),
+        mainSnapshot != null ? mainSnapshot.snapshotId() : null);
+  }
+
+  private void advancePhase() {
+    output.emitWatermark(new Watermark(nextPhaseTs));
+    nextPhaseTs++;
+  }
+
+  private static boolean shouldSkip(Table table, Snapshot snapshot) {
+    if (snapshot
+        .summary()
+        
.containsKey(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY)) {
+      return true;
+    }
+
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+    return !changes.addedDataFiles().iterator().hasNext()
+        && !changes.addedDeleteFiles().iterator().hasNext()
+        && !changes.removedDataFiles().iterator().hasNext()
+        && !changes.removedDeleteFiles().iterator().hasNext();
+  }
+
+  /**
+   * Returns the oldest staging snapshot that still needs to be processed, or 
{@code null} if there
+   * is nothing new to do. The planner processes snapshots in chronological 
order so the worker's
+   * index is built up incrementally.
+   */
+  private Snapshot findNextSnapshot(Snapshot stagingHead) {
+    // Cold start with staging pointing at main: only the head matters; 
everything older is already
+    // on main and doesn't need converting.
+    if (lastStagingSnapshotId == null && stagingBranch.equals(targetBranch)) {
+      return shouldSkip(table, stagingHead) ? null : stagingHead;
+    }
+
+    Long stopAt =
+        lastStagingSnapshotId != null
+            ? lastStagingSnapshotId
+            : findIntersection(stagingHead, table.snapshot(targetBranch));
+
+    Snapshot oldest = null;
+    Snapshot current = stagingHead;
+    while (current != null) {
+      if (stopAt != null && current.snapshotId() == stopAt) {
+        break;
+      }
+
+      if (!shouldSkip(table, current)) {
+        oldest = current;
+      }
+
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    return oldest;
+  }
+
+  private Long findIntersection(Snapshot stagingHead, Snapshot mainHead) {

Review Comment:
   Yes, to find the common ancestor.



##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertPlanner.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Planner for the equality delete conversion pipeline. For each trigger, it 
picks the oldest
+ * staging snapshot that hasn't been converted yet and emits {@link 
ReadCommand}s describing the
+ * files its downstream readers and workers must process.
+ *
+ * <p>Emissions are grouped into phases separated by watermarks so the 
downstream operators observe
+ * them in a consistent order:
+ *
+ * <ol>
+ *   <li>Main data — read once per new equality-field-set to seed the worker's 
key index.
+ *   <li>Equality deletes from the staging snapshot — resolve against the 
index.
+ *   <li>Positional deletes from the staging snapshot — bypass the worker, go 
straight to DVs.
+ *   <li>New staging data — added to the index for the next cycle.
+ * </ol>
+ *
+ * The planner also detects external commits on main (not produced by the 
converter itself) and
+ * re-emits main data so the worker can rebuild its index before resolving the 
current cycle's
+ * equality deletes.
+ *
+ * <p>An {@link EqualityConvertPlanResult} with the current cycle's metadata 
is emitted via the
+ * {@link #METADATA_STREAM} side output after the read commands.
+ */
+@Internal
+public class EqualityConvertPlanner extends AbstractStreamOperator<ReadCommand>
+    implements OneInputStreamOperator<Trigger, ReadCommand> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EqualityConvertPlanner.class);
+
+  public static final OutputTag<EqualityConvertPlanResult> METADATA_STREAM =
+      new OutputTag<>("metadata-stream") {};
+
+  private static final String PROCESSED_EQ_DELETE_FILE_NUM_METRIC = 
"processedEqDeleteFileNum";
+  private static final String PROCESSED_STAGING_SNAPSHOT_NUM_METRIC = 
"processedStagingSnapshotNum";
+  private static final String SKIPPED_NO_OP_CYCLES_METRIC = 
"skippedNoOpCycles";
+  private static final String REINDEX_COUNT_METRIC = "reindexCount";
+
+  private final String tableName;
+  private final String taskName;
+  private final int taskIndex;
+  private final TableLoader tableLoader;
+  private final String stagingBranch;
+  private final String targetBranch;
+
+  // Main snapshot id the worker's index reflects.
+  private transient ListState<Long> indexSnapshotState;
+  // Equality field sets for which main data has been emitted.
+  private transient ListState<List<Integer>> mainIndexEmittedState;
+
+  private transient Table table;
+
+  private transient Long lastMainSnapshotId;
+  private transient Long lastStagingSnapshotId;
+  private transient Long indexSnapshotId;
+  private transient Set<List<Integer>> mainIndexEmittedSet;
+
+  private transient long nextPhaseTs;
+
+  private transient Counter processedEqDeleteFileNumCounter;
+  private transient Counter processedStagingSnapshotNumCounter;
+  private transient Counter skippedNoOpCyclesCounter;
+  private transient Counter reindexCounter;
+
+  public EqualityConvertPlanner(
+      String tableName,
+      String taskName,
+      int taskIndex,
+      TableLoader tableLoader,
+      String stagingBranch,
+      String targetBranch) {
+    this.tableName = tableName;
+    this.taskName = taskName;
+    this.taskIndex = taskIndex;
+    this.tableLoader = tableLoader;
+    this.stagingBranch = stagingBranch;
+    this.targetBranch = targetBranch;
+  }
+
+  @Override
+  public void open() throws Exception {
+    super.open();
+    if (!tableLoader.isOpen()) {
+      tableLoader.open();
+    }
+
+    table = tableLoader.loadTable();
+
+    MetricGroup taskMetricGroup =
+        TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, 
taskName, taskIndex);
+    this.processedEqDeleteFileNumCounter =
+        taskMetricGroup.counter(PROCESSED_EQ_DELETE_FILE_NUM_METRIC);
+    this.processedStagingSnapshotNumCounter =
+        taskMetricGroup.counter(PROCESSED_STAGING_SNAPSHOT_NUM_METRIC);
+    this.skippedNoOpCyclesCounter = 
taskMetricGroup.counter(SKIPPED_NO_OP_CYCLES_METRIC);
+    this.reindexCounter = taskMetricGroup.counter(REINDEX_COUNT_METRIC);
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+    mainIndexEmittedState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("mainIndexEmitted", 
Types.LIST(Types.INT)));
+    indexSnapshotState =
+        context
+            .getOperatorStateStore()
+            .getListState(new ListStateDescriptor<>("indexSnapshotId", 
Types.LONG));
+
+    indexSnapshotId = getValue(indexSnapshotState);
+
+    mainIndexEmittedSet = Sets.newHashSet();
+    for (List<Integer> fieldSet : mainIndexEmittedState.get()) {
+      mainIndexEmittedSet.add(fieldSet);
+    }
+  }
+
+  @Override
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    super.snapshotState(context);
+    storeValue(indexSnapshotState, indexSnapshotId);
+
+    mainIndexEmittedState.clear();
+    for (List<Integer> fieldSet : mainIndexEmittedSet) {
+      mainIndexEmittedState.add(Lists.newArrayList(fieldSet));
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<Trigger> element) throws Exception {
+    long triggerTs = element.getTimestamp();
+    nextPhaseTs = Math.max(triggerTs, nextPhaseTs + 1);
+
+    try {
+      table.refresh();
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+
+      updateState(mainSnapshot);
+
+      Snapshot stagingSnapshot = table.snapshot(stagingBranch);
+      if (stagingSnapshot == null) {
+        LOG.info("No snapshot on staging branch '{}', nothing to convert.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      Snapshot nextSnapshot = findNextSnapshot(stagingSnapshot);
+      if (nextSnapshot == null) {
+        LOG.info("No new staging snapshots to process on branch '{}'.", 
stagingBranch);
+        emitNoOpResult(triggerTs, currentMainSnapshotId);
+        return;
+      }
+
+      emitReadCommandsForSnapshot(nextSnapshot, triggerTs, 
currentMainSnapshotId);
+    } catch (Exception e) {
+      LOG.error(
+          "Error processing equality deletes for table {} task {}[{}]",
+          tableName,
+          taskName,
+          taskIndex,
+          e);
+      output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e));
+      Snapshot mainSnapshot = table.snapshot(targetBranch);
+      Long currentMainSnapshotId = mainSnapshot != null ? 
mainSnapshot.snapshotId() : null;
+      emitNoOpResult(triggerTs, currentMainSnapshotId);
+    }
+  }
+
+  private void updateState(Snapshot mainHead) {
+    Long currentMainSnapshotId = mainHead != null ? mainHead.snapshotId() : 
null;
+
+    int externalCount = 0;
+    Snapshot current = mainHead;
+    while (current != null) {
+      String prop =
+          
current.summary().get(EqualityConvertCommitter.COMMITTED_STAGING_SNAPSHOT_PROPERTY);
+      if (prop != null) {
+        // First property found going back from main head is our most recent 
commit.
+        lastStagingSnapshotId = Long.parseLong(prop);
+        break;
+      }
+
+      externalCount++;
+      current = current.parentId() != null ? 
table.snapshot(current.parentId()) : null;
+    }
+
+    boolean mainChanged =
+        lastMainSnapshotId != null && 
!lastMainSnapshotId.equals(currentMainSnapshotId);
+    if (mainChanged && externalCount > 0 && !mainIndexEmittedSet.isEmpty()) {
+      LOG.info(
+          "Detected {} external commit(s) on branch '{}' since previous cycle, 
reindexing.",
+          externalCount,
+          targetBranch);
+      indexSnapshotId = currentMainSnapshotId;
+      // Full reindex: re-emits all data files from the current main snapshot 
for every equality
+      // field set that was previously indexed. This is triggered when an 
external commit (e.g.
+      // compaction) changes the main branch and the existing index may 
reference stale file paths.
+      emitReindexFromMain(mainHead);
+      reindexCounter.inc();
+    } else if (mainIndexEmittedSet.isEmpty()) {
+      indexSnapshotId = currentMainSnapshotId;
+    }
+
+    lastMainSnapshotId = currentMainSnapshotId;
+  }
+
+  private void emitReadCommandsForSnapshot(
+      Snapshot snapshot, long triggerTs, Long currentMainSnapshotId) {
+    List<DataFile> newDataFiles = Lists.newArrayList();
+    List<DeleteFile> stagingDVFiles = Lists.newArrayList();
+    List<DeleteFile> posDeleteFiles = Lists.newArrayList();
+    Map<List<Integer>, List<DeleteFile>> deletesByFieldIds = Maps.newHashMap();
+    SnapshotChanges changes = 
SnapshotChanges.builderFor(table).snapshot(snapshot).build();
+
+    for (DataFile dataFile : changes.addedDataFiles()) {

Review Comment:
   Agreed. For now, let's throw when we discover any. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to