stevenzwu commented on code in PR #15590:
URL: https://github.com/apache/iceberg/pull/15590#discussion_r2942206797


##########
core/src/main/java/org/apache/iceberg/DataFileAccumulator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.DataFileSet;
+
+/** Accumulates data files and flushes them to manifests when a count 
threshold is reached. */
+class DataFileAccumulator {
+
+  private final int flushThreshold;
+  private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests;
+  private final Function<ManifestFile, ManifestReader<DataFile>> readManifest;
+
+  private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap();
+  private final List<ManifestFile> flushedManifests = Lists.newLinkedList();
+  private final Set<Integer> allSpecIds = Sets.newHashSet();
+  private int pendingCount = 0;
+
+  DataFileAccumulator(
+      int flushThreshold,
+      BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests,
+      Function<ManifestFile, ManifestReader<DataFile>> readManifest) {
+    this.flushThreshold = flushThreshold;
+    this.writeManifests = writeManifests;
+    this.readManifest = readManifest;

Review Comment:
   Missing input validation for `flushThreshold`. A threshold of `0` would 
flush on every `add()` call (writing a manifest per file), and a negative value 
would silently never flush. Consider:
   
   ```java
   Preconditions.checkArgument(flushThreshold > 0, "Flush threshold must be 
positive: %s", flushThreshold);
   ```



##########
core/src/main/java/org/apache/iceberg/DataFileAccumulator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.DataFileSet;
+
+/** Accumulates data files and flushes them to manifests when a count 
threshold is reached. */
+class DataFileAccumulator {
+
+  private final int flushThreshold;
+  private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests;
+  private final Function<ManifestFile, ManifestReader<DataFile>> readManifest;
+
+  private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap();
+  private final List<ManifestFile> flushedManifests = Lists.newLinkedList();
+  private final Set<Integer> allSpecIds = Sets.newHashSet();
+  private int pendingCount = 0;
+
+  DataFileAccumulator(
+      int flushThreshold,
+      BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests,

Review Comment:
   Nit: `ArrayList` would be a better fit here — this list is only appended to 
and iterated. `LinkedList` has higher per-element overhead and poor cache 
locality.



##########
core/src/main/java/org/apache/iceberg/DataFileAccumulator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.DataFileSet;
+
+/** Accumulates data files and flushes them to manifests when a count 
threshold is reached. */
+class DataFileAccumulator {
+
+  private final int flushThreshold;
+  private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests;
+  private final Function<ManifestFile, ManifestReader<DataFile>> readManifest;
+
+  private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap();
+  private final List<ManifestFile> flushedManifests = Lists.newLinkedList();
+  private final Set<Integer> allSpecIds = Sets.newHashSet();
+  private int pendingCount = 0;
+
+  DataFileAccumulator(
+      int flushThreshold,
+      BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests,
+      Function<ManifestFile, ManifestReader<DataFile>> readManifest) {
+    this.flushThreshold = flushThreshold;
+    this.writeManifests = writeManifests;
+    this.readManifest = readManifest;
+  }
+
+  /**
+   * Adds a data file. Returns true if the file was new. May flush to 
manifests if the threshold is
+   * reached.
+   */
+  boolean add(DataFile file, int specId) {
+    DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> 
DataFileSet.create());
+    if (files.add(file)) {
+      allSpecIds.add(specId);
+      pendingCount++;
+      if (pendingCount >= flushThreshold) {
+        flush();
+      }
+      return true;
+    }
+    return false;
+  }
+
+  boolean hasFiles() {
+    return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty();
+  }
+
+  boolean hasPendingFiles() {
+    return !pendingBySpec.isEmpty();
+  }
+
+  /** All partition spec IDs seen across adds. */
+  Set<Integer> specIds() {
+    return Collections.unmodifiableSet(allSpecIds);
+  }
+
+  Map<Integer, DataFileSet> pendingBySpec() {
+    return pendingBySpec;
+  }
+
+  List<ManifestFile> flushedManifests() {
+    return flushedManifests;
+  }

Review Comment:
   `pendingBySpec()` and `flushedManifests()` return the raw mutable internal 
collections, while `specIds()` correctly returns 
`Collections.unmodifiableSet(...)`. For consistency and to prevent accidental 
mutation from callers in `MergingSnapshotProducer`, consider returning 
unmodifiable views here too — or exposing purpose-specific methods instead of 
raw collection access.



##########
core/src/main/java/org/apache/iceberg/DataFileAccumulator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.DataFileSet;
+
+/** Accumulates data files and flushes them to manifests when a count 
threshold is reached. */
+class DataFileAccumulator {
+
+  private final int flushThreshold;
+  private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests;
+  private final Function<ManifestFile, ManifestReader<DataFile>> readManifest;
+
+  private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap();
+  private final List<ManifestFile> flushedManifests = Lists.newLinkedList();
+  private final Set<Integer> allSpecIds = Sets.newHashSet();
+  private int pendingCount = 0;
+
+  DataFileAccumulator(
+      int flushThreshold,
+      BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests,
+      Function<ManifestFile, ManifestReader<DataFile>> readManifest) {
+    this.flushThreshold = flushThreshold;
+    this.writeManifests = writeManifests;
+    this.readManifest = readManifest;
+  }
+
+  /**
+   * Adds a data file. Returns true if the file was new. May flush to 
manifests if the threshold is
+   * reached.
+   */
+  boolean add(DataFile file, int specId) {
+    DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> 
DataFileSet.create());
+    if (files.add(file)) {
+      allSpecIds.add(specId);
+      pendingCount++;
+      if (pendingCount >= flushThreshold) {
+        flush();
+      }
+      return true;
+    }
+    return false;
+  }
+
+  boolean hasFiles() {
+    return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty();
+  }
+
+  boolean hasPendingFiles() {
+    return !pendingBySpec.isEmpty();
+  }
+
+  /** All partition spec IDs seen across adds. */
+  Set<Integer> specIds() {
+    return Collections.unmodifiableSet(allSpecIds);
+  }
+
+  Map<Integer, DataFileSet> pendingBySpec() {
+    return pendingBySpec;
+  }
+
+  List<ManifestFile> flushedManifests() {
+    return flushedManifests;
+  }
+
+  /**
+   * All added data files (flushed + pending). Flushed files are lazily read 
back from their
+   * manifests to avoid holding them in memory during the add phase.
+   */
+  List<DataFile> allAddedFiles() {
+    ImmutableList.Builder<DataFile> builder = ImmutableList.builder();
+
+    for (ManifestFile manifest : flushedManifests) {
+      try (ManifestReader<DataFile> reader = readManifest.apply(manifest)) {
+        for (DataFile file : reader) {
+          builder.add(file);
+        }
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to read manifest file: %s", 
manifest.path());
+      }
+    }
+
+    pendingBySpec.values().forEach(builder::addAll);
+    return builder.build();
+  }
+
+  void deleteUncommitted(Set<ManifestFile> committed, Consumer<String> 
deleteFunc) {
+    boolean anyDeleted = false;
+    for (ManifestFile manifest : flushedManifests) {
+      if (!committed.contains(manifest)) {
+        deleteFunc.accept(manifest.path());
+        anyDeleted = true;
+      }

Review Comment:
   On successful commit where all flushed manifests are in the committed set, 
`anyDeleted` is false and `flushedManifests` is never cleared. This is benign 
since the accumulator isn't reused after commit, but it's inconsistent with 
`SnapshotProducer.deleteUncommitted()` which always clears when asked. Simpler 
to always clear:
   
   ```java
   void deleteUncommitted(Set<ManifestFile> committed, Consumer<String> 
deleteFunc) {
       for (ManifestFile manifest : flushedManifests) {
           if (!committed.contains(manifest)) {
               deleteFunc.accept(manifest.path());
           }
       }
       flushedManifests.clear();
   }
   ```



##########
core/src/main/java/org/apache/iceberg/DataFileAccumulator.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.DataFileSet;
+
+/** Accumulates data files and flushes them to manifests when a count 
threshold is reached. */
+class DataFileAccumulator {
+
+  private final int flushThreshold;
+  private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests;
+  private final Function<ManifestFile, ManifestReader<DataFile>> readManifest;
+
+  private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap();
+  private final List<ManifestFile> flushedManifests = Lists.newLinkedList();
+  private final Set<Integer> allSpecIds = Sets.newHashSet();
+  private int pendingCount = 0;
+
+  DataFileAccumulator(
+      int flushThreshold,
+      BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> 
writeManifests,
+      Function<ManifestFile, ManifestReader<DataFile>> readManifest) {
+    this.flushThreshold = flushThreshold;
+    this.writeManifests = writeManifests;
+    this.readManifest = readManifest;
+  }
+
+  /**
+   * Adds a data file. Returns true if the file was new. May flush to 
manifests if the threshold is
+   * reached.
+   */
+  boolean add(DataFile file, int specId) {
+    DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> 
DataFileSet.create());
+    if (files.add(file)) {
+      allSpecIds.add(specId);
+      pendingCount++;
+      if (pendingCount >= flushThreshold) {
+        flush();
+      }
+      return true;
+    }
+    return false;
+  }
+
+  boolean hasFiles() {
+    return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty();
+  }
+
+  boolean hasPendingFiles() {
+    return !pendingBySpec.isEmpty();
+  }
+
+  /** All partition spec IDs seen across adds. */
+  Set<Integer> specIds() {
+    return Collections.unmodifiableSet(allSpecIds);
+  }
+
+  Map<Integer, DataFileSet> pendingBySpec() {
+    return pendingBySpec;
+  }
+
+  List<ManifestFile> flushedManifests() {
+    return flushedManifests;
+  }
+
+  /**
+   * All added data files (flushed + pending). Flushed files are lazily read 
back from their
+   * manifests to avoid holding them in memory during the add phase.
+   */
+  List<DataFile> allAddedFiles() {
+    ImmutableList.Builder<DataFile> builder = ImmutableList.builder();
+
+    for (ManifestFile manifest : flushedManifests) {
+      try (ManifestReader<DataFile> reader = readManifest.apply(manifest)) {
+        for (DataFile file : reader) {
+          builder.add(file);

Review Comment:
   This reads back every flushed manifest and builds an `ImmutableList` of all 
data files, re-materializing everything into memory. This is called from 
`addedDataFiles()` which `BaseOverwriteFiles.validate()` uses to check added 
files against the overwrite filter. For a 500K-file overwrite, this re-spikes 
memory to the same level as before the optimization.
   
   Could this return a `CloseableIterable<DataFile>` that streams lazily 
through the flushed manifests instead? That would be consistent with the 
project's preference for `CloseableIterable` over eager materialization.



##########
core/src/test/java/org/apache/iceberg/TestDataFileAccumulator.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Test;
+
+class TestDataFileAccumulator {
+
+  private static final PartitionSpec SPEC = PartitionSpec.unpartitioned();
+
+  private final AtomicInteger flushCount = new AtomicInteger(0);
+
+  private DataFileAccumulator newAccumulator(int threshold) {
+    return new DataFileAccumulator(threshold, this::stubWrite, manifest -> 
null);
+  }
+
+  private List<ManifestFile> stubWrite(Collection<DataFile> files, int specId) 
{
+    flushCount.incrementAndGet();

Review Comment:
   The stub reader is `manifest -> null`, so calling `allAddedFiles()` after a 
flush would NPE on the null reader. The lazy read-back path — the most novel 
part of this PR — has no unit test coverage here. Could you add a test that 
verifies `allAddedFiles()` returns the correct files after a flush? The 
integration tests in `TestMergeAppend` exercise this indirectly, but a focused 
unit test would be valuable.



##########
core/src/main/java/org/apache/iceberg/TableProperties.java:
##########
@@ -121,6 +121,10 @@ private TableProperties() {}
   public static final String MANIFEST_MERGE_ENABLED = 
"commit.manifest-merge.enabled";
   public static final boolean MANIFEST_MERGE_ENABLED_DEFAULT = true;
 
+  public static final String MANIFEST_FLUSH_FILE_COUNT_THRESHOLD =

Review Comment:
   Do we need a new property here? We already have 
`commit.manifest.target-size-bytes`, and the per-entry serialized size is 
estimable from the table schema.
   
   The six per-column stats maps (`column_sizes`, `value_counts`, 
`null_value_counts`, `nan_value_counts`, `lower_bounds`, `upper_bounds`) 
dominate entry size and scale with `min(schema.columns().size(), 
METRICS_MAX_INFERRED_COLUMN_DEFAULTS)`. A rough estimate:
   
   ```
   estimatedEntryBytes ≈ 200 + 80 * min(numColumns, metricsMaxColumns) + 20 * 
partitionFields
   flushThreshold = targetManifestSizeBytes * N / estimatedEntryBytes
   ```
   
   For a 10-column table this yields ~80K (same ballpark as the 100K default); 
for a 100-column table it yields ~10K, which correctly flushes sooner since 
each entry consumes ~8x more memory. The hardcoded 100K default treats both 
identically.
   
   All the inputs (`schema`, `spec`, `metricsMaxInferredColumnDefaults`, 
`targetManifestSizeBytes`) are already available in the 
`MergingSnapshotProducer` constructor. This would make the behavior self-tuning 
and avoid adding a new configuration knob.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to