mxm commented on code in PR #15996: URL: https://github.com/apache/iceberg/pull/15996#discussion_r3155891896
########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertCommitter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Commits data files and DVs to the main branch. Receives {@link DVMergeResult}s from parallel + * {@link EqualityConvertDVMerger} instances (input 1) and an {@link EqualityConvertPlanResult} from + * the planner (input 2). Assembles the final file lists and commits using a {@link RowDelta} + * operation once the plan result and done-timestamp watermark have both arrived. + * + * <p>Watermarks are absorbed while a cycle is active. + * + * <p>Emits a {@link Trigger} after each cycle (commit, no-op, or error) so the downstream {@link + * TaskResultAggregator} can track task completion. This is the sole source of Trigger records for + * the Aggregator. + * + * <p>On restart the planner re-derives its position from {@link + * #COMMITTED_STAGING_SNAPSHOT_PROPERTY} on main and replays any in-process cycle. + */ +@Internal +public class EqualityConvertCommitter extends AbstractStreamOperator<Trigger> + implements TwoInputStreamOperator<DVMergeResult, EqualityConvertPlanResult, Trigger> { + + private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertCommitter.class); + + static final String COMMITTED_STAGING_SNAPSHOT_PROPERTY = "equality-convert-staging-snapshot"; + + private static final String ADDED_DV_NUM_METRIC = "addedDvNum"; + private static final String COMMIT_DURATION_MS_METRIC = "commitDurationMs"; + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + private final String targetBranch; + + private transient Table table; + private transient List<DVMergeResult> bufferedResults; + private transient EqualityConvertPlanResult planResult; + private transient Watermark pendingMark; + + private transient Counter errorCounter; + private transient Counter addedDataFileNumCounter; + private transient Counter addedDataFileSizeCounter; + private transient Counter addedDvNumCounter; + private transient Counter commitDurationMsCounter; + + public EqualityConvertCommitter( + String tableName, + String taskName, + int taskIndex, + TableLoader tableLoader, + String targetBranch) { + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + this.targetBranch = targetBranch; + } + + @Override + public void open() throws Exception { + super.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + this.table = tableLoader.loadTable(); + this.bufferedResults = Lists.newArrayList(); + + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex); + this.errorCounter = taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER); + this.addedDataFileNumCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC); + this.addedDataFileSizeCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC); + this.addedDvNumCounter = taskMetricGroup.counter(ADDED_DV_NUM_METRIC); + this.commitDurationMsCounter = taskMetricGroup.counter(COMMIT_DURATION_MS_METRIC); + } + + @Override + public void processElement1(StreamRecord<DVMergeResult> record) { + bufferedResults.add(record.getValue()); + } + + @Override + public void processElement2(StreamRecord<EqualityConvertPlanResult> record) throws Exception { + planResult = record.getValue(); + tryCommitAndForward(); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + pendingMark = mark; + tryCommitAndForward(); + + // Forward watermarks when no active cycle to prevent stalling downstream. + if (planResult == null && pendingMark != null) { + Watermark toForward = pendingMark; + pendingMark = null; + super.processWatermark(toForward); + } + } + + @Override + public void close() throws Exception { + super.close(); + tableLoader.close(); + } + + private void tryCommitAndForward() throws Exception { + if (planResult == null || pendingMark == null) { + return; + } + + if (pendingMark.getTimestamp() < planResult.doneTimestamp()) { + return; + } + + try { + commitIfNeeded(); + } catch (Exception e) { + LOG.error( + "Failed to commit equality convert result for table {} task {}[{}]", + tableName, + taskName, + taskIndex, + e); + output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord<>(e)); + errorCounter.inc(); + } + + // Emit Trigger for the Aggregator (even on error or no-op). + output.collect(new StreamRecord<>(Trigger.create(planResult.triggerTimestamp(), taskIndex))); + + Watermark mark = pendingMark; + bufferedResults.clear(); + planResult = null; + pendingMark = null; + + super.processWatermark(mark); Review Comment: Not needed, I applied your suggestion. ########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertCommitter.java: ########## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.ContentFileUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Commits data files and DVs to the main branch. Receives {@link DVMergeResult}s from parallel + * {@link EqualityConvertDVMerger} instances (input 1) and an {@link EqualityConvertPlanResult} from + * the planner (input 2). Assembles the final file lists and commits using a {@link RowDelta} + * operation once the plan result and done-timestamp watermark have both arrived. + * + * <p>Watermarks are absorbed while a cycle is active. + * + * <p>Emits a {@link Trigger} after each cycle (commit, no-op, or error) so the downstream {@link + * TaskResultAggregator} can track task completion. This is the sole source of Trigger records for + * the Aggregator. + * + * <p>On restart the planner re-derives its position from {@link + * #COMMITTED_STAGING_SNAPSHOT_PROPERTY} on main and replays any in-process cycle. + */ +@Internal +public class EqualityConvertCommitter extends AbstractStreamOperator<Trigger> + implements TwoInputStreamOperator<DVMergeResult, EqualityConvertPlanResult, Trigger> { + + private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertCommitter.class); + + static final String COMMITTED_STAGING_SNAPSHOT_PROPERTY = "equality-convert-staging-snapshot"; + + private static final String ADDED_DV_NUM_METRIC = "addedDvNum"; + private static final String COMMIT_DURATION_MS_METRIC = "commitDurationMs"; + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + private final String targetBranch; + + private transient Table table; + private transient List<DVMergeResult> bufferedResults; + private transient EqualityConvertPlanResult planResult; + private transient Watermark pendingMark; + + private transient Counter errorCounter; + private transient Counter addedDataFileNumCounter; + private transient Counter addedDataFileSizeCounter; + private transient Counter addedDvNumCounter; + private transient Counter commitDurationMsCounter; + + public EqualityConvertCommitter( + String tableName, + String taskName, + int taskIndex, + TableLoader tableLoader, + String targetBranch) { + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + this.targetBranch = targetBranch; + } + + @Override + public void open() throws Exception { + super.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + this.table = tableLoader.loadTable(); + this.bufferedResults = Lists.newArrayList(); + + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName, taskName, taskIndex); + this.errorCounter = taskMetricGroup.counter(TableMaintenanceMetrics.ERROR_COUNTER); + this.addedDataFileNumCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_NUM_METRIC); + this.addedDataFileSizeCounter = + taskMetricGroup.counter(TableMaintenanceMetrics.ADDED_DATA_FILE_SIZE_METRIC); + this.addedDvNumCounter = taskMetricGroup.counter(ADDED_DV_NUM_METRIC); + this.commitDurationMsCounter = taskMetricGroup.counter(COMMIT_DURATION_MS_METRIC); + } + + @Override + public void processElement1(StreamRecord<DVMergeResult> record) { + bufferedResults.add(record.getValue()); + } + + @Override + public void processElement2(StreamRecord<EqualityConvertPlanResult> record) throws Exception { + planResult = record.getValue(); + tryCommitAndForward(); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + pendingMark = mark; + tryCommitAndForward(); + + // Forward watermarks when no active cycle to prevent stalling downstream. + if (planResult == null && pendingMark != null) { + Watermark toForward = pendingMark; + pendingMark = null; + super.processWatermark(toForward); + } + } + + @Override + public void close() throws Exception { + super.close(); + tableLoader.close(); + } + + private void tryCommitAndForward() throws Exception { + if (planResult == null || pendingMark == null) { + return; + } Review Comment: The planResult is only send after emitting all watermarks for the previous phases. That's why it can be null on watermark handling. I've simplified the code. ########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertDVMerger.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parallel DV writer that receives {@link DVMergeCommand}s from the {@link + * EqualityConvertDVResolver} and writes merged deletion vector files. Each instance processes a + * subset of data files (distributed via rebalance) and writes its own Puffin file. + */ +@Internal +public class EqualityConvertDVMerger extends AbstractStreamOperator<DVMergeResult> + implements OneInputStreamOperator<DVMergeCommand, DVMergeResult> { + + private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertDVMerger.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + + private transient Table table; + private transient OutputFileFactory fileFactory; + private transient List<DVMergeCommand> bufferedCommands; + private transient DeleteLoader deleteLoader; + + public EqualityConvertDVMerger( + String tableName, String taskName, int taskIndex, TableLoader tableLoader) { + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + } + + @Override + public void open() throws Exception { + super.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + table = tableLoader.loadTable(); + fileFactory = + OutputFileFactory.builderFor(table, taskIndex, 0L).format(FileFormat.PUFFIN).build(); + deleteLoader = new BaseDeleteLoader(deleteFile -> table.io().newInputFile(deleteFile)); + bufferedCommands = Lists.newArrayList(); + } + + @Override + public void processElement(StreamRecord<DVMergeCommand> record) { + bufferedCommands.add(record.getValue()); Review Comment: I'm doing that now. ########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ConvertEqualityDeletes.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import java.io.IOException; +import java.io.UncheckedIOException; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.DVMergeCommand; +import org.apache.iceberg.flink.maintenance.operator.DVMergeResult; +import org.apache.iceberg.flink.maintenance.operator.DVPosition; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertCommitter; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertDVMerger; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertDVResolver; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertPlanResult; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertPlanner; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertReader; +import org.apache.iceberg.flink.maintenance.operator.EqualityConvertWorker; +import org.apache.iceberg.flink.maintenance.operator.IndexCommand; +import org.apache.iceberg.flink.maintenance.operator.ReadCommand; +import org.apache.iceberg.flink.maintenance.operator.SerializedEqualityValues; +import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Creates the equality delete to DV conversion data stream. Runs a single iteration of the + * conversion for every {@link Trigger} event. + * + * <p>The pipeline reads equality delete files from a staging branch, converts them to deletion + * vectors (DVs) using a primary key index stored in Flink state, and commits the data files and DVs + * to the main branch. + * + * <p>The conversion is split into parallel stages: + * + * <ol> + * <li>Planner (p=1): scans staging branch, emits file-level ReadCommands with phase timestamps + * <li>Reader (p=N): reads files, emits row-level IndexCommands + * <li>Worker (p=N): maintains PK index shards, resolves equality deletes to DV positions + * <li>DVResolver (p=1): groups positions by data file, resolves partition info and existing DVs + * <li>DVMerger (p=N): writes merged deletion vector files + * <li>Committer (p=1): commits data files and DVs to main branch + * </ol> + * + * <p>Mutual exclusion with concurrent maintenance tasks (e.g. compaction) is enforced by the Flink + * maintenance framework lock. + */ +@Experimental +public class ConvertEqualityDeletes { + static final String PLANNER_TASK_NAME = "EqConvert Planner"; + static final String READER_TASK_NAME = "EqConvert Reader"; + static final String WORKER_TASK_NAME = "EqConvert Worker"; + static final String DV_RESOLVER_TASK_NAME = "EqConvert DVResolver"; + static final String DV_MERGER_TASK_NAME = "EqConvert DVMerger"; + static final String COMMIT_TASK_NAME = "EqConvert Commit"; + static final String AGGREGATOR_TASK_NAME = "EqConvert Aggregator"; + + private ConvertEqualityDeletes() {} + + public static Builder builder() { + return new Builder(); + } + + public static class Builder extends MaintenanceTaskBuilder<Builder> { + private String stagingBranch; + private String targetBranch = SnapshotRef.MAIN_BRANCH; + + @Override + String maintenanceTaskName() { + return "ConvertEqualityDeletes"; + } + + /** Sets the staging branch name that holds the equality delete files and data files. */ + public Builder stagingBranch(String newStagingBranch) { + this.stagingBranch = newStagingBranch; + return this; + } + + /** + * Sets the target branch where converted data files and DVs are committed. Defaults to the main + * branch. + */ + public Builder targetBranch(String newTargetBranch) { + this.targetBranch = newTargetBranch; + return this; + } + + @Override + DataStream<TaskResult> append(DataStream<Trigger> trigger) { + Preconditions.checkNotNull(stagingBranch, "stagingBranch must be set"); + validateFormatVersion(); + + // Planner (p=1): emits ReadCommands with phase timestamps and watermarks + SingleOutputStreamOperator<ReadCommand> planned = + trigger + .transform( + operatorName(PLANNER_TASK_NAME), + TypeInformation.of(ReadCommand.class), + new EqualityConvertPlanner( + tableName(), taskName(), index(), tableLoader(), stagingBranch, targetBranch)) + .uid(PLANNER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + // Reader (p=N): reads files, emits IndexCommands + SingleOutputStreamOperator<IndexCommand> indexed = + planned + .rebalance() + .process(new EqualityConvertReader(tableLoader())) + .name(operatorName(READER_TASK_NAME)) + .uid(READER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // Worker (p=N): keyed by full PK, phase-aware buffering + SingleOutputStreamOperator<DVPosition> dvPositions = + indexed + .keyBy(IndexCommand::key, TypeInformation.of(SerializedEqualityValues.class)) + .process(new EqualityConvertWorker()) + .name(operatorName(WORKER_TASK_NAME)) + .uid(WORKER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // DVPositions from the Reader (converted V2 positional delete files) bypass the Worker. + DataStream<DVPosition> v2PositionalDeletes = + indexed.getSideOutput(EqualityConvertReader.DV_POSITION_STREAM); + DataStream<DVPosition> allDvPositions = dvPositions.union(v2PositionalDeletes); + + // Metadata side output from planner + DataStream<EqualityConvertPlanResult> metadata = + planned.getSideOutput(EqualityConvertPlanner.METADATA_STREAM); + + // DVResolver (p=1): groups positions, collects partition/DV info + SingleOutputStreamOperator<DVMergeCommand> resolved = + allDvPositions + .connect(metadata) + .transform( + operatorName(DV_RESOLVER_TASK_NAME), + TypeInformation.of(DVMergeCommand.class), + new EqualityConvertDVResolver( + tableName(), taskName(), index(), tableLoader(), targetBranch)) + .uid(DV_RESOLVER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + // DVMerger (p=N): writes merged DV files + SingleOutputStreamOperator<DVMergeResult> merged = + resolved + .rebalance() + .transform( + operatorName(DV_MERGER_TASK_NAME), + TypeInformation.of(DVMergeResult.class), + new EqualityConvertDVMerger(tableName(), taskName(), index(), tableLoader())) + .uid(DV_MERGER_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .setParallelism(parallelism()); + + // Upstream errors become abort signals so a partial read never commits. + DataStream<DVMergeResult> upstreamAborts = + indexed + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .union(dvPositions.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .union(resolved.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .map(e -> DVMergeResult.abort()) + .returns(TypeInformation.of(DVMergeResult.class)) + .name(operatorName("UpstreamAbort")) + .uid("upstream-abort" + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + // Committer (p=1): commits data files + DVs to main. + SingleOutputStreamOperator<Trigger> committed = + merged + .union(upstreamAborts) + .connect(metadata) + .transform( + operatorName(COMMIT_TASK_NAME), + TypeInformation.of(Trigger.class), + new EqualityConvertCommitter( + tableName(), taskName(), index(), tableLoader(), targetBranch)) + .uid(COMMIT_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + + // Aggregator (p=1): collects errors and emits TaskResult. + return committed + .connect( + planned + .getSideOutput(TaskResultAggregator.ERROR_STREAM) + .union(indexed.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .union(dvPositions.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .union(resolved.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .union(merged.getSideOutput(TaskResultAggregator.ERROR_STREAM)) + .union(committed.getSideOutput(TaskResultAggregator.ERROR_STREAM))) + .transform( + operatorName(AGGREGATOR_TASK_NAME), + TypeInformation.of(TaskResult.class), + new TaskResultAggregator(tableName(), taskName(), index())) + .uid(AGGREGATOR_TASK_NAME + uidSuffix()) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + } + + private void validateFormatVersion() { + try (TableLoader loader = tableLoader().clone()) { + if (!loader.isOpen()) { + loader.open(); + } + + Table table = loader.loadTable(); + Preconditions.checkState( + table instanceof HasTableOperations, + "Table must implement HasTableOperations for format version check"); + int formatVersion = ((HasTableOperations) table).operations().current().formatVersion(); + Preconditions.checkArgument( + formatVersion >= 3, + "ConvertEqualityDeletes requires table format version >= 3 (DVs), " + + "but table '%s' is version %s", + tableName(), + formatVersion); + } catch (IOException e) { + throw new UncheckedIOException("Failed to validate table format version", e); + } + } Review Comment: Thanks! Added. ########## flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/EqualityConvertDVMerger.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.Map; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parallel DV writer that receives {@link DVMergeCommand}s from the {@link + * EqualityConvertDVResolver} and writes merged deletion vector files. Each instance processes a + * subset of data files (distributed via rebalance) and writes its own Puffin file. + */ +@Internal +public class EqualityConvertDVMerger extends AbstractStreamOperator<DVMergeResult> + implements OneInputStreamOperator<DVMergeCommand, DVMergeResult> { + + private static final Logger LOG = LoggerFactory.getLogger(EqualityConvertDVMerger.class); + + private final String tableName; + private final String taskName; + private final int taskIndex; + private final TableLoader tableLoader; + + private transient Table table; + private transient OutputFileFactory fileFactory; + private transient List<DVMergeCommand> bufferedCommands; + private transient DeleteLoader deleteLoader; + + public EqualityConvertDVMerger( + String tableName, String taskName, int taskIndex, TableLoader tableLoader) { + this.tableName = tableName; + this.taskName = taskName; + this.taskIndex = taskIndex; + this.tableLoader = tableLoader; + } + + @Override + public void open() throws Exception { + super.open(); + if (!tableLoader.isOpen()) { + tableLoader.open(); + } + + table = tableLoader.loadTable(); + fileFactory = + OutputFileFactory.builderFor(table, taskIndex, 0L).format(FileFormat.PUFFIN).build(); + deleteLoader = new BaseDeleteLoader(deleteFile -> table.io().newInputFile(deleteFile)); + bufferedCommands = Lists.newArrayList(); + } + + @Override + public void processElement(StreamRecord<DVMergeCommand> record) { + bufferedCommands.add(record.getValue()); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + if (!bufferedCommands.isEmpty()) { + if (bufferedCommands.stream().anyMatch(DVMergeCommand::isAbort)) { + output.collect(new StreamRecord<>(DVMergeResult.abort())); + } else { + try { + Map<String, DeleteFile> existingDVs = Maps.newHashMap(); + for (DVMergeCommand cmd : bufferedCommands) { + if (cmd.existingDV() != null) { + existingDVs.put(cmd.dataFilePath(), cmd.existingDV()); + } + } Review Comment: Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
