aokolnychyi commented on code in PR #10200:
URL: https://github.com/apache/iceberg/pull/10200#discussion_r1580291026


##########
core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java:
##########
@@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter 
implements Closeable {
     private final StructProjection structProjection;
     private RollingFileWriter dataWriter;
     private RollingEqDeleteWriter eqDeleteWriter;
-    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter;
     private Map<StructLike, PathOffset> insertedRowMap;
 
     protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, 
Schema deleteSchema) {
+      this(partition, schema, deleteSchema, DeleteGranularity.PARTITION);
+    }
+
+    protected BaseEqualityDeltaWriter(
+        StructLike partition,
+        Schema schema,
+        Schema deleteSchema,
+        DeleteGranularity deleteGranularity) {
       Preconditions.checkNotNull(schema, "Iceberg table schema cannot be 
null.");
       Preconditions.checkNotNull(deleteSchema, "Equality-delete schema cannot 
be null.");
       this.structProjection = StructProjection.create(schema, deleteSchema);
 
       this.dataWriter = new RollingFileWriter(partition);
       this.eqDeleteWriter = new RollingEqDeleteWriter(partition);
       this.posDeleteWriter =
-          new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, 
partition);
+          new SortingPositionOnlyDeleteWriter<>(
+              () ->
+                  appenderFactory.newPosDeleteWriter(
+                      partition == null

Review Comment:
   If I am not mistaken, this line should check `spec.isUnpartitioned() || 
partition == null` to avoid weird issues we discovered in #7685. We also have 
to pass `spec` with `partition` below in `newOutputFile`.
   
   I'd also consider moving this logic into a separate method for better 
readability. Up to you, though.
   
   ```
   private EncryptedOutputFile newOutputFile(StructLike partition) {
     if (spec.isUnpartitioned() || partition == null) {
       return fileFactory.newOutputFile();
     } else {
       return fileFactory.newOutputFile(spec, partition);
     }
   }
   ```



##########
core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java:
##########
@@ -140,12 +159,18 @@ public void write(T row) throws IOException {
       PathOffset previous = insertedRowMap.put(copiedKey, pathOffset);
       if (previous != null) {
         // TODO attach the previous row if has a positional-delete row schema 
in appender factory.
-        posDeleteWriter.delete(previous.path, previous.rowOffset, null);
+        writePosDelete(previous);
       }
 
       dataWriter.write(row);
     }
 
+    private void writePosDelete(PathOffset pathOffset) {
+      PositionDelete<T> delete = PositionDelete.create();

Review Comment:
   We usually avoid creating an extra `PositionDelete` object for every delete, 
if possible. Instead, we initialize one object and reuse it. Take a look at 
`BasePositionDeltaWriter`.



##########
core/src/main/java/org/apache/iceberg/deletes/SortingPositionOnlyDeleteWriter.java:
##########
@@ -118,6 +118,10 @@ private DeleteWriteResult writeFileDeletes() throws 
IOException {
 
   @SuppressWarnings("CollectionUndefinedEquality")
   private DeleteWriteResult writeDeletes(Collection<CharSequence> paths) 
throws IOException {
+    if (paths.size() == 0) {

Review Comment:
   Shall we use `paths.isEmpty()`?



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java:
##########
@@ -56,7 +58,8 @@ abstract class BaseDeltaTaskWriter extends 
BaseTaskWriter<RowData> {
       Schema schema,
       RowType flinkSchema,
       List<Integer> equalityFieldIds,
-      boolean upsert) {
+      boolean upsert,
+      DeleteGranularity deleteGranularity) {

Review Comment:
   Are we worried about the increased metadata size?



##########
core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java:
##########
@@ -109,18 +112,34 @@ protected abstract class BaseEqualityDeltaWriter 
implements Closeable {
     private final StructProjection structProjection;
     private RollingFileWriter dataWriter;
     private RollingEqDeleteWriter eqDeleteWriter;
-    private SortedPosDeleteWriter<T> posDeleteWriter;
+    private FileWriter<PositionDelete<T>, DeleteWriteResult> posDeleteWriter;
     private Map<StructLike, PathOffset> insertedRowMap;
 
     protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, 
Schema deleteSchema) {
+      this(partition, schema, deleteSchema, DeleteGranularity.PARTITION);
+    }
+
+    protected BaseEqualityDeltaWriter(
+        StructLike partition,
+        Schema schema,
+        Schema deleteSchema,
+        DeleteGranularity deleteGranularity) {

Review Comment:
   I am not convinced we should make it configurable when writing equality 
deltas. Our original plan was to always generate file-scoped position deletes 
when writing Flink CDC. Take a look at 
[this](https://github.com/apache/iceberg/pull/9384#discussion_r1439722314) 
discussion.



##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java:
##########
@@ -167,6 +168,18 @@ public DistributionMode distributionMode() {
     return DistributionMode.fromName(modeName);
   }
 
+  public DeleteGranularity deleteGranularity() {
+    String modeName =
+        confParser
+            .stringConf()
+            .option(FlinkWriteOptions.DELETE_GRANULARITY.key())
+            .flinkConfig(FlinkWriteOptions.DELETE_GRANULARITY)
+            .tableProperty(TableProperties.DELETE_GRANULARITY)
+            .defaultValue(TableProperties.DELETE_GRANULARITY_DEFAULT)

Review Comment:
   The way we generate position deletes in Flink CDC causes a lot of issues. I 
think we should switch now and not even offer the configuration. If we think 
that everyone who runs Flink CDC would have to set this config, then I don't 
think this config should exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to