rdblue commented on a change in pull request #796: URL: https://github.com/apache/iceberg/pull/796#discussion_r460619473
########## File path: core/src/main/java/org/apache/iceberg/MicroBatches.java ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MicroBatches { + private MicroBatches() { + } + + public static class MicroBatch { + private final long snapshotId; + private final int startFileIndex; + private final int endFileIndex; + private final long sizeInBytes; + private final List<FileScanTask> tasks; + private final boolean lastIndexOfSnapshot; + + private MicroBatch(long snapshotId, int startFileIndex, int endFileIndex, long sizeInBytes, + List<FileScanTask> tasks, boolean lastIndexOfSnapshot) { + this.snapshotId = snapshotId; + this.startFileIndex = startFileIndex; + this.endFileIndex = endFileIndex; + this.sizeInBytes = sizeInBytes; + this.tasks = tasks; + this.lastIndexOfSnapshot = lastIndexOfSnapshot; + } + + public long snapshotId() { + return snapshotId; + } + + public int startFileIndex() { + return startFileIndex; + } + + public int endFileIndex() { + return endFileIndex; + } + + public long sizeInBytes() { + return sizeInBytes; + } + + public List<FileScanTask> tasks() { + return tasks; + } + + public boolean lastIndexOfSnapshot() { + return lastIndexOfSnapshot; + } + } + + public static MicroBatchBuilder from(Snapshot snapshot, FileIO io) { + return new MicroBatchBuilder(snapshot, io); + } + + public static class MicroBatchBuilder { + private static final Logger LOG = LoggerFactory.getLogger(MicroBatchBuilder.class); + + private final Snapshot snapshot; + private final FileIO io; + private boolean caseSensitive; + private Map<Integer, PartitionSpec> specsById; + + private MicroBatchBuilder(Snapshot snapshot, FileIO io) { + this.snapshot = snapshot; + this.io = io; + this.caseSensitive = true; + } + + public MicroBatchBuilder caseSensitive(boolean sensitive) { + this.caseSensitive = sensitive; + return this; + } + + public MicroBatchBuilder specsById(Map<Integer, PartitionSpec> specs) { + this.specsById = specs; + return this; + } + + public MicroBatch generate(int startFileIndex, long targetSizeInBytes, boolean isStarting) { + Preconditions.checkArgument(startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); + Preconditions.checkArgument(targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0"); + + List<ManifestFile> manifests = isStarting ? snapshot.dataManifests() : + snapshot.dataManifests().stream().filter(m -> m.snapshotId().equals(snapshot.snapshotId())) + .collect(Collectors.toList()); + + List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests); + List<Pair<ManifestFile, Integer>> skippedManifestIndexes = skipManifests(manifestIndexes, startFileIndex); + + return generateMicroBatch(skippedManifestIndexes, startFileIndex, targetSizeInBytes, isStarting); + } + + /** + * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, manifest + * m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, 3), (m3, 5). + * + * @param manifestFiles List of input manifests used to index. + * @return a list of manifest index with key as manifest file, value as file counts. + */ + private List<Pair<ManifestFile, Integer>> indexManifests(List<ManifestFile> manifestFiles) { + int currentFileIndex = 0; + List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList(); + + for (ManifestFile manifest : manifestFiles) { + manifestIndexes.add(Pair.of(manifest, currentFileIndex)); + currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount(); + } + + return manifestIndexes; + } + + /** + * Method to skip the manifest file in which the index is smaller than startFileIndex. For example, if the + * index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned manifest index list is: + * (m2, 3), (m3, 5). + * + * @param indexedManifests List of input manifests. + * @param startFileIndex Index used to skip the processed manifests. + * @return a sub-list of manifest file index which only contains the manifest indexes larger than the + * startFileIndex. + */ + private List<Pair<ManifestFile, Integer>> skipManifests(List<Pair<ManifestFile, Integer>> indexedManifests, + int startFileIndex) { + if (startFileIndex == 0) { + return indexedManifests; + } + + int manifestIndex = 0; + for (Pair<ManifestFile, Integer> manifest : indexedManifests) { + if (manifest.second() > startFileIndex) { + break; + } + + manifestIndex++; + } + + return indexedManifests.subList(manifestIndex - 1, indexedManifests.size()); + } + + /** + * Method to generate MicroBatch of this snapshot based on the indexed manifests, controlled by targetSizeInBytes. + * + * @param indexedManifests A list of indexed manifests to generate MicroBatch + * @param startFileIndex A startFileIndex used to skip processed files. + * @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes must be smaller than + * this size. + * @param isStarting Used to check where all the data file should be processed, or only added files. + * @return A MicroBatch. + */ + private MicroBatch generateMicroBatch(List<Pair<ManifestFile, Integer>> indexedManifests, + int startFileIndex, long targetSizeInBytes, boolean isStarting) { + if (indexedManifests.isEmpty()) { + return new MicroBatch(snapshot.snapshotId(), startFileIndex, startFileIndex + 1, 0L, + Collections.emptyList(), true); + } + + long currentSizeInBytes = 0L; + int currentFileIndex = 0; + boolean isLastIndex = false; + List<FileScanTask> tasks = Lists.newArrayList(); + + for (int idx = 0; idx < indexedManifests.size(); idx++) { + currentFileIndex = indexedManifests.get(idx).second(); + + try (CloseableIterable<FileScanTask> taskIterable = open(indexedManifests.get(idx).first(), isStarting); + CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) { + + while (taskIter.hasNext()) { Review comment: Because you're using try-with-resources for the Iterable, you don't really need to worry about the Iterator that was created from it. The Iterable will close all open Iterators that were created in the block. So you can write this a bit more concisely using `for`. It's fine either way, though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
