rdblue commented on code in PR #7731:
URL: https://github.com/apache/iceberg/pull/7731#discussion_r1209536460


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -251,4 +322,126 @@ private static void validatePlanningArguments(long 
splitSize, int lookback, long
     Preconditions.checkArgument(lookback > 0, "Split planning lookback must be 
> 0: %s", lookback);
     Preconditions.checkArgument(openFileCost >= 0, "File open cost must be >= 
0: %s", openFileCost);
   }
+
+  private static <T extends ScanTask, G extends ScanTaskGroup<T>>
+      CloseableIterable<G> planTasksInternal(
+          CloseableIterable<T> splitFiles,
+          long splitSize,
+          int lookback,
+          Function<T, Long> weightFunc,
+          Function<List<T>, G> groupFunc) {
+
+    return CloseableIterable.transform(
+        CloseableIterable.combine(
+            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
+            splitFiles),
+        groupFunc);
+  }
+
+  private static class AdaptiveSplitPlanningIterable<T extends ScanTask, G 
extends ScanTaskGroup<T>>
+      extends CloseableGroup implements CloseableIterable<G> {
+    private final CloseableIterable<T> files;
+    private final int parallelism;
+    private final long splitSize;
+    private final int lookback;
+    private final Function<T, Long> weightFunc;
+    private final BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> 
splitFunc;
+    private final Function<List<T>, G> groupFunc;
+
+    private Long targetSize = null;
+
+    private AdaptiveSplitPlanningIterable(
+        CloseableIterable<T> files,
+        int parallelism,
+        long splitSize,
+        int lookback,
+        Function<T, Long> weightFunc,
+        BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> splitFunc,
+        Function<List<T>, G> groupFunc) {
+      this.files = files;
+      this.parallelism = parallelism;
+      this.splitSize = splitSize;
+      this.lookback = lookback;
+      this.weightFunc = weightFunc;
+      this.splitFunc = splitFunc;
+      this.groupFunc = groupFunc;
+    }
+
+    @Override
+    public CloseableIterator<G> iterator() {
+      if (targetSize != null) {
+        // target size is already known so plan with the static target size
+        CloseableIterable<T> splitTasks = splitFunc.apply(files, targetSize);
+        CloseableIterator<G> iter =
+            planTasksInternal(splitTasks, targetSize, lookback, weightFunc, 
groupFunc).iterator();
+        addCloseable(iter);
+        return iter;
+      }
+
+      boolean shouldClose = true;
+      CloseableIterator<T> tasksIter = files.iterator();
+      try {
+        // load tasks until the iterator is exhausted or until the total 
weight is enough to get the
+        // parallelism at the split size passed in.
+        LinkedList<T> readAheadTasks = Lists.newLinkedList();
+        long readToSize = parallelism * splitSize;
+        long totalSize = 0L;
+
+        while (tasksIter.hasNext()) {
+          T task = tasksIter.next();
+          readAheadTasks.addLast(task);
+          totalSize += weightFunc.apply(task);
+
+          if (totalSize > readToSize) {
+            break;
+          }
+        }
+
+        // if total size was reached, then the requested split size is used. 
otherwise, the iterator
+        // was exhausted and the split size will be adjusted to target 
parallelism with a reasonable
+        // minimum.
+        this.targetSize = Math.max(MIN_SPLIT_SIZE, Math.min(totalSize / 
parallelism, splitSize));

Review Comment:
   This works by targeting the split size that was requested, but trying to 
maximize parallelism if there isn't enough data to reach that split size. The 
logic for the requested split size is the same as what we currently use.
   
   The row groups size is still orthogonal to this. We don't need to change the 
split size based on the target row group size. That will happen automatically 
because we track the split offsets and will just not be able to split files any 
further than their existing row groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to