aokolnychyi commented on code in PR #10200: URL: https://github.com/apache/iceberg/pull/10200#discussion_r1580291026
########## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ########## @@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private final StructProjection structProjection; private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; - private SortedPosDeleteWriter<T> posDeleteWriter; + private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter; private Map<StructLike, PathOffset> insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { + this(partition, schema, deleteSchema, DeleteGranularity.PARTITION); + } + + protected BaseEqualityDeltaWriter( + StructLike partition, + Schema schema, + Schema deleteSchema, + DeleteGranularity deleteGranularity) { Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null."); Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot be null."); this.structProjection = StructProjection.create(schema, deleteSchema); this.dataWriter = new RollingFileWriter(partition); this.eqDeleteWriter = new RollingEqDeleteWriter(partition); this.posDeleteWriter = - new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition); + new SortingPositionOnlyDeleteWriter<>( + () -> + appenderFactory.newPosDeleteWriter( + partition == null Review Comment: If I am not mistaken, this line should check `spec.isUnpartitioned() || partition == null` to avoid weird issues we discovered in #7685. We also have to pass `spec` with `partition` below in `newOutputFile`. I'd also consider moving this logic into a separate method for better readability. Up to you, though. ``` private EncryptedOutputFile newOutputFile(StructLike partition) { if (spec.isUnpartitioned() || partition == null) { return fileFactory.newOutputFile(); } else { return fileFactory.newOutputFile(spec, partition); } } ``` ########## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ########## @@ -140,12 +159,18 @@ public void write(T row) throws IOException { PathOffset previous = insertedRowMap.put(copiedKey, pathOffset); if (previous != null) { // TODO attach the previous row if has a positional-delete row schema in appender factory. - posDeleteWriter.delete(previous.path, previous.rowOffset, null); + writePosDelete(previous); } dataWriter.write(row); } + private void writePosDelete(PathOffset pathOffset) { + PositionDelete<T> delete = PositionDelete.create(); Review Comment: We usually avoid creating an extra `PositionDelete` object for every delete, if possible. Instead, we initialize one object and reuse it. Take a look at `BasePositionDeltaWriter`. ########## core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java: ########## @@ -118,6 +118,10 @@ private DeleteWriteResult writeFileDeletes() throws IOException { @SuppressWarnings("CollectionUndefinedEquality") private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) throws IOException { + if (paths.size() == 0) { Review Comment: Shall we use `paths.isEmpty()`? ########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java: ########## @@ -56,7 +58,8 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> { Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds, - boolean upsert) { + boolean upsert, + DeleteGranularity deleteGranularity) { Review Comment: Are we worried about the increased metadata size? ########## core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java: ########## @@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private final StructProjection structProjection; private RollingFileWriter dataWriter; private RollingEqDeleteWriter eqDeleteWriter; - private SortedPosDeleteWriter<T> posDeleteWriter; + private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter; private Map<StructLike, PathOffset> insertedRowMap; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { + this(partition, schema, deleteSchema, DeleteGranularity.PARTITION); + } + + protected BaseEqualityDeltaWriter( + StructLike partition, + Schema schema, + Schema deleteSchema, + DeleteGranularity deleteGranularity) { Review Comment: I am not convinced we should make it configurable when writing equality deltas. Our original plan was to always generate file-scoped position deletes when writing Flink CDC. Take a look at [this](https://github.com/apache/iceberg/pull/9384#discussion_r1439722314) discussion. ########## flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java: ########## @@ -167,6 +168,18 @@ public DistributionMode distributionMode() { return DistributionMode.fromName(modeName); } + public DeleteGranularity deleteGranularity() { + String modeName = + confParser + .stringConf() + .option(FlinkWriteOptions.DELETE_GRANULARITY.key()) + .flinkConfig(FlinkWriteOptions.DELETE_GRANULARITY) + .tableProperty(TableProperties.DELETE_GRANULARITY) + .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT) Review Comment: The way we generate position deletes in Flink CDC causes a lot of issues. I think we should switch now and not even offer the configuration. If we think that everyone who runs Flink CDC would have to set this config, then I don't think this config should exist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org