zhangjun0x01 commented on a change in pull request #1624:
URL: https://github.com/apache/iceberg/pull/1624#discussion_r508237727
##########
File path:
spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -112,170 +54,35 @@ protected RewriteDataFilesAction self() {
@Override
protected Table table() {
- return table;
- }
-
- /**
- * Pass a PartitionSpec id to specify which PartitionSpec should be used in
DataFile rewrite
- *
- * @param specId PartitionSpec id to rewrite
- * @return this for method chaining
- */
- public RewriteDataFilesAction outputSpecId(int specId) {
- Preconditions.checkArgument(table.specs().containsKey(specId), "Invalid
spec id %d", specId);
- this.spec = table.specs().get(specId);
- return this;
+ return getTable();
}
- /**
- * Specify the target rewrite data file size in bytes
- *
- * @param targetSize size in bytes of rewrite data file
- * @return this for method chaining
- */
- public RewriteDataFilesAction targetSizeInBytes(long targetSize) {
- Preconditions.checkArgument(targetSize > 0L, "Invalid target rewrite data
file size in bytes %d",
- targetSize);
- this.targetSizeInBytes = targetSize;
- return this;
- }
-
- /**
- * Specify the number of "bins" considered when trying to pack the next file
split into a task.
- * Increasing this usually makes tasks a bit more even by considering more
ways to pack file regions into a single
- * task with extra planning cost.
- * <p>
- * This configuration can reorder the incoming file regions, to preserve
order for lower/upper bounds in file
- * metadata, user can use a lookback of 1.
- *
- * @param lookback number of "bins" considered when trying to pack the next
file split into a task.
- * @return this for method chaining
- */
- public RewriteDataFilesAction splitLookback(int lookback) {
- Preconditions.checkArgument(lookback > 0L, "Invalid split lookback %d",
lookback);
- this.splitLookback = lookback;
- return this;
- }
-
- /**
- * Specify the minimum file size to count to pack into one "bin". If the
read file size is smaller than this specified
- * threshold, Iceberg will use this value to do count.
- * <p>
- * this configuration controls the number of files to compact for each task,
small value would lead to a
- * high compaction, the default value is 4MB.
- *
- * @param openFileCost minimum file size to count to pack into one "bin".
- * @return this for method chaining
- */
- public RewriteDataFilesAction splitOpenFileCost(long openFileCost) {
- Preconditions.checkArgument(openFileCost > 0L, "Invalid split openFileCost
%d", openFileCost);
- this.splitOpenFileCost = openFileCost;
- return this;
- }
-
- /**
- * Pass a row Expression to filter DataFiles to be rewritten. Note that all
files that may contain data matching the
- * filter may be rewritten.
- *
- * @param expr Expression to filter out DataFiles
- * @return this for method chaining
- */
- public RewriteDataFilesAction filter(Expression expr) {
- this.filter = Expressions.and(filter, expr);
- return this;
- }
@Override
public RewriteDataFilesActionResult execute() {
- CloseableIterable<FileScanTask> fileScanTasks = null;
- try {
- fileScanTasks = table.newScan()
- .caseSensitive(caseSensitive)
- .ignoreResiduals()
- .filter(filter)
- .planFiles();
- } finally {
- try {
- if (fileScanTasks != null) {
- fileScanTasks.close();
- }
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterable", ioe);
- }
- }
-
- Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks =
groupTasksByPartition(fileScanTasks.iterator());
- Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks =
groupedTasks.entrySet().stream()
- .filter(kv -> kv.getValue().size() > 1)
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks =
getFilteredGroupedTasks();
// Nothing to rewrite if there's only one DataFile in each partition.
if (filteredGroupedTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
// Split and combine tasks under each partition
- List<CombinedScanTask> combinedScanTasks =
filteredGroupedTasks.values().stream()
- .map(scanTasks -> {
- CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(
- CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
- return TableScanUtil.planTasks(splitTasks, targetSizeInBytes,
splitLookback, splitOpenFileCost);
- })
- .flatMap(Streams::stream)
- .collect(Collectors.toList());
+ List<CombinedScanTask> combinedScanTasks =
getCombinedScanTasks(filteredGroupedTasks);
Review comment:
I did not extract into a method, because the following
getCurrentDataFiles also used filteredGroupedTasks, if extracted into a method,
then the groupTasksByPartition method will be executed twice
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]