aokolnychyi commented on code in PR #2276:
URL: https://github.com/apache/iceberg/pull/2276#discussion_r1014217373
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
return CloseableIterable.combine(splitTasks, tasks);
}
+ public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+ List<T> splitFiles,
+ long splitSize,
+ int lookback,
+ long openFileCost,
+ StructProjection projection) {
Review Comment:
Why not have it as `StructType`? I believe we just need the type of the key
we can't combine by. When we consume this from Spark, we will compute an
intersection of all partition specs in `Partitioning` as `StructType`, similar
to what we already do in `Partitioning$partitionType` (computes a union of all
partition types).
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
return CloseableIterable.combine(splitTasks, tasks);
}
+ public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
Review Comment:
If we accept a list, what about returning a list as well? We access tasks by
index in `SparkBatch`.
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
return CloseableIterable.combine(splitTasks, tasks);
}
+ public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+ List<T> splitFiles,
+ long splitSize,
+ int lookback,
+ long openFileCost,
+ StructProjection projection) {
+ Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative
or 0): %s", splitSize);
+ Preconditions.checkArgument(
+ lookback > 0, "Invalid split planning lookback (negative or 0): %s",
lookback);
+ Preconditions.checkArgument(
+ openFileCost >= 0, "Invalid file open cost (negative): %s",
openFileCost);
+
+ // Check the size of delete file as well to avoid unbalanced bin-packing
+ Function<FileScanTask, Long> weightFunc =
+ file ->
+ Math.max(
+ file.length()
+ +
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
+ (1 + file.deletes().size()) * openFileCost);
+
+ ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+ Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+ for (T task : splitFiles) {
+ PartitionSpec spec = task.spec();
+ StructProjection projectedStruct =
+ StructProjection.create(spec.partitionType(), projection.type());
Review Comment:
I am afraid creating a projection for each task would be expensive. What
about having a map of projections that would be lazily initialized? That way,
we will only create one projection per spec.
```
Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
...
projectionsBySpec.computeIfAbsent(
spec.specId(), specId -> StructProjection.create(spec.partitionType(),
...));
```
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
return CloseableIterable.combine(splitTasks, tasks);
}
+ public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
Review Comment:
I would consider adding `PartitionScanTask` and using it as a boundary in
this method. That way, we can make this logic generic and support arbitrary
scan tasks with partition info.
```
public interface PartitionScanTask extends ScanTask {
PartitionSpec spec();
StructLike partition();
}
```
```
public static <T extends PartitionScanTask> List<ScanTaskGroup<T>>
planTaskGroups(...) {}
```
@rdblue and other reviewers, thoughts on this?
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
return CloseableIterable.combine(splitTasks, tasks);
}
+ public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+ List<T> splitFiles,
+ long splitSize,
+ int lookback,
+ long openFileCost,
+ StructProjection projection) {
+ Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative
or 0): %s", splitSize);
+ Preconditions.checkArgument(
+ lookback > 0, "Invalid split planning lookback (negative or 0): %s",
lookback);
+ Preconditions.checkArgument(
+ openFileCost >= 0, "Invalid file open cost (negative): %s",
openFileCost);
+
+ // Check the size of delete file as well to avoid unbalanced bin-packing
+ Function<FileScanTask, Long> weightFunc =
+ file ->
+ Math.max(
+ file.length()
+ +
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
+ (1 + file.deletes().size()) * openFileCost);
+
+ ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+ Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+ for (T task : splitFiles) {
+ PartitionSpec spec = task.spec();
+ StructProjection projectedStruct =
+ StructProjection.create(spec.partitionType(), projection.type());
+ Types.StructType projectedPartitionType = projectedStruct.type();
+ StructLikeWrapper wrapper =
+ StructLikeWrapper.forType(projectedPartitionType)
Review Comment:
This invocation is very expensive as `forType` would call this constructor:
```
private StructLikeWrapper(Types.StructType type) {
this(Comparators.forType(type), JavaHash.forType(type));
}
```
Instead, we should try to call `forType` only once and then use `copyFor`.
```
StructLikeWrapper wrapper = StructLikeWrapper.forType(...);
for (T task : splitFiles) {
StructProjection projection = ...
projection.wrap(task.file().partition());
groupedFiles.put(wrapper.copyFor(projection.copy()), task);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]