Repository: incubator-beam Updated Branches: refs/heads/master efd1f95b6 -> 4755c5a78
BoundedReader: add getSplitPoints{Consumed,Remaining} And implement and test it for common sources OffsetBasedReader: test limited parallelism signals AvroSource: rewrite to support remaining parallelism *) Make the start of a block match Avro's definition: the first byte after the previous sync marker. This enables detecting the last block in the file. *) This change enables us to unify currentOffset and currentBlockOffset, as all records are emitted at the start of the block that contains them. *) Simplify block header reading to have fewer object allocations and buffers using a direct reader and a (allocated once only) CountingInputStream to measure the size of that header. *) Add tests for consumed and remaining parallelism *) Let BlockBasedSource detect the end of the file in remaining parallelism. *) Verify in more places that the correct number of bytes is read from the input Avro file. CompressedSource: add tests of parallelism and progress *) empty file *) non-empty compressed file *) non-empty not-compressed file TextIO: implement and test parallelism *) empty file *) non-empty file CountingSource: test limited parallelism CompressedSource: implement currentOffset based on bytes decompressed *) This is not a very good offset because it is an upper bound, but it is likely better than not reporting any progress at all. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/32a6cde4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/32a6cde4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/32a6cde4 Branch: refs/heads/master Commit: 32a6cde4e43726849713a7183c66aa28f43b0868 Parents: efd1f95 Author: Dan Halperin <dhalp...@google.com> Authored: Tue May 3 17:53:48 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri May 20 16:42:00 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroSource.java | 166 ++++++++++++------- .../apache/beam/sdk/io/BlockBasedSource.java | 26 +-- .../org/apache/beam/sdk/io/BoundedSource.java | 145 +++++++++++++++- .../apache/beam/sdk/io/CompressedSource.java | 132 +++++++++++++-- .../org/apache/beam/sdk/io/CountingSource.java | 5 + .../org/apache/beam/sdk/io/DatastoreIO.java | 13 ++ .../org/apache/beam/sdk/io/FileBasedSource.java | 2 +- .../apache/beam/sdk/io/OffsetBasedSource.java | 49 +++++- .../java/org/apache/beam/sdk/io/TextIO.java | 20 ++- .../beam/sdk/io/range/OffsetRangeTracker.java | 109 ++++++++++-- .../org/apache/beam/sdk/io/AvroSourceTest.java | 86 +++++++++- .../beam/sdk/io/CompressedSourceTest.java | 107 +++++++++++- .../apache/beam/sdk/io/CountingSourceTest.java | 30 ++++ .../apache/beam/sdk/io/FileBasedSourceTest.java | 2 +- .../beam/sdk/io/OffsetBasedSourceTest.java | 71 +++++++- .../java/org/apache/beam/sdk/io/TextIOTest.java | 114 ++++++++++++- .../sdk/io/range/OffsetRangeTrackerTest.java | 1 - .../apache/beam/sdk/io/hdfs/HDFSFileSource.java | 12 ++ 18 files changed, 969 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index ef8e427..255199f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io; +import static com.google.common.base.Preconditions.checkState; + import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -40,18 +42,24 @@ import org.apache.avro.reflect.ReflectDatumReader; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; +import org.apache.commons.compress.utils.CountingInputStream; import java.io.ByteArrayInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.util.Arrays; import java.util.Collection; import java.util.zip.Inflater; import java.util.zip.InflaterInputStream; +import javax.annotation.concurrent.GuardedBy; + // CHECKSTYLE.OFF: JavadocStyle /** * A {@link FileBasedSource} for reading Avro files. @@ -439,10 +447,6 @@ public class AvroSource<T> extends BlockBasedSource<T> { * the total number of records in the block and the block's size in bytes, followed by the * block's (optionally-encoded) records. Each block is terminated by a 16-bit sync marker. * - * <p>Here, we consider the sync marker that precedes a block to be its offset, as this allows - * a reader that begins reading at that offset to detect the sync marker and the beginning of - * the block. - * * @param <T> The type of records contained in the block. */ @Experimental(Experimental.Kind.SOURCE_SINK) @@ -450,24 +454,25 @@ public class AvroSource<T> extends BlockBasedSource<T> { // The current block. private AvroBlock<T> currentBlock; - // Offset of the block. + // A lock used to synchronize block offsets for getRemainingParallelism + private final Object progressLock = new Object(); + + // Offset of the current block. + @GuardedBy("progressLock") private long currentBlockOffset = 0; // Size of the current block. + @GuardedBy("progressLock") private long currentBlockSizeBytes = 0; - // Current offset within the stream. - private long currentOffset = 0; - // Stream used to read from the underlying file. - // A pushback stream is used to restore bytes buffered during seeking/decoding. + // A pushback stream is used to restore bytes buffered during seeking. private PushbackInputStream stream; + // Counts the number of bytes read. Used only to tell how many bytes are taken up in + // a block's variable-length header. + private CountingInputStream countStream; - // Small buffer for reading encoded values from the stream. - // The maximum size of an encoded long is 10 bytes, and this buffer will be used to read two. - private final byte[] readBuffer = new byte[20]; - - // Decoder to decode binary-encoded values from the buffer. + // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer. private BinaryDecoder decoder; /** @@ -482,51 +487,67 @@ public class AvroSource<T> extends BlockBasedSource<T> { return (AvroSource<T>) super.getCurrentSource(); } + // Precondition: the stream is positioned after the sync marker in the current (about to be + // previous) block. currentBlockSize equals the size of the current block, or zero if this + // reader was just started. + // + // Postcondition: same as above, but for the new current (formerly next) block. @Override public boolean readNextBlock() throws IOException { - // The next block in the file is after the first sync marker that can be read starting from - // the current offset. First, we seek past the next sync marker, if it exists. After a sync - // marker is the start of a block. A block begins with the number of records contained in - // the block, encoded as a long, followed by the size of the block in bytes, encoded as a - // long. The currentOffset after this method should be last byte after this block, and the - // currentBlockOffset should be the start of the sync marker before this block. - - // Seek to the next sync marker, if one exists. - currentOffset += advancePastNextSyncMarker(stream, getCurrentSource().getSyncMarker()); - - // The offset of the current block includes its preceding sync marker. - currentBlockOffset = currentOffset - getCurrentSource().getSyncMarker().length; - - // Read a small buffer to parse the block header. - // We cannot use a BinaryDecoder to do this directly from the stream because a BinaryDecoder - // internally buffers data and we only want to read as many bytes from the stream as the size - // of the header. Though BinaryDecoder#InputStream returns an input stream that is aware of - // its internal buffering, we would have to re-wrap this input stream to seek for the next - // block in the file. - int read = stream.read(readBuffer); - // We reached the last sync marker in the file. - if (read <= 0) { + long startOfNextBlock = currentBlockOffset + currentBlockSizeBytes; + + // Before reading the variable-sized block header, record the current number of bytes read. + long preHeaderCount = countStream.getBytesRead(); + decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder); + long numRecords; + try { + numRecords = decoder.readLong(); + } catch (EOFException e) { + // Expected for the last block, at which the start position is the EOF. The way to detect + // stream ending is to try reading from it. return false; } - decoder = DecoderFactory.get().binaryDecoder(readBuffer, decoder); - long numRecords = decoder.readLong(); long blockSize = decoder.readLong(); - // The decoder buffers data internally, but since we know the size of the stream the - // decoder has constructed from the readBuffer, the number of bytes available in the - // input stream is equal to the number of unconsumed bytes. - int headerSize = readBuffer.length - decoder.inputStream().available(); - stream.unread(readBuffer, headerSize, read - headerSize); + // Mark header size as the change in the number of bytes read. + long headerSize = countStream.getBytesRead() - preHeaderCount; // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro // specification are [32, 2^30], so this narrowing is ok. byte[] data = new byte[(int) blockSize]; - stream.read(data); + int read = stream.read(data); + checkState(blockSize == read, "Only %s/%s bytes in the block were read", read, blockSize); currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource()); - currentBlockSizeBytes = blockSize; - // Update current offset with the number of bytes we read to get the next block. - currentOffset += headerSize + blockSize; + // Read the end of this block, which MUST be a sync marker for correctness. + byte[] syncMarker = getCurrentSource().getSyncMarker(); + byte[] readSyncMarker = new byte[syncMarker.length]; + long syncMarkerOffset = startOfNextBlock + headerSize + blockSize; + long bytesRead = stream.read(readSyncMarker); + checkState( + bytesRead == syncMarker.length, + "When trying to read a sync marker at position %s, only able to read %s/%s bytes", + syncMarkerOffset, + bytesRead, + syncMarker.length); + if (!Arrays.equals(syncMarker, readSyncMarker)) { + throw new IllegalStateException( + String.format( + "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s", + syncMarkerOffset, + syncMarkerOffset + syncMarker.length, + getCurrentSource().getFileOrPatternSpec(), + Arrays.toString(readSyncMarker) + )); + } + + // Atomically update both the position and offset of the new block. + synchronized (progressLock) { + currentBlockOffset = startOfNextBlock; + // Total block size includes the header, block content, and trailing sync marker. + currentBlockSizeBytes = headerSize + blockSize + syncMarker.length; + } + return true; } @@ -537,32 +558,65 @@ public class AvroSource<T> extends BlockBasedSource<T> { @Override public long getCurrentBlockOffset() { - return currentBlockOffset; + synchronized (progressLock) { + return currentBlockOffset; + } } @Override public long getCurrentBlockSize() { - return currentBlockSizeBytes; + synchronized (progressLock) { + return currentBlockSizeBytes; + } + } + + @Override + public long getSplitPointsRemaining() { + if (isDone()) { + return 0; + } + synchronized (progressLock) { + if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) { + // This block is known to be the last block in the range. + return 1; + } + } + return super.getSplitPointsRemaining(); } /** * Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able - * to push back the syncBuffer and the readBuffer. + * to push back the syncBuffer. */ private PushbackInputStream createStream(ReadableByteChannel channel) { return new PushbackInputStream( Channels.newInputStream(channel), - getCurrentSource().getSyncMarker().length + readBuffer.length); + getCurrentSource().getSyncMarker().length); } - /** - * Starts reading from the provided channel. Assumes that the channel is already seeked to - * the source's start offset. - */ + // Postcondition: the stream is positioned at the beginning of the first block after the start + // of the current source, and currentBlockOffset is that position. Additionally, + // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty. @Override protected void startReading(ReadableByteChannel channel) throws IOException { + long startOffset = getCurrentSource().getStartOffset(); + byte[] syncMarker = getCurrentSource().getSyncMarker(); + long syncMarkerLength = syncMarker.length; + + if (startOffset != 0) { + // Rewind order to find the sync marker ending the previous block. + long position = Math.max(0, startOffset - syncMarkerLength); + ((SeekableByteChannel) channel).position(position); + startOffset = position; + } + + // Satisfy the post condition. stream = createStream(channel); - currentOffset = getCurrentSource().getStartOffset(); + countStream = new CountingInputStream(stream); + synchronized (progressLock) { + currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker); + currentBlockSizeBytes = 0; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java index 31ef055..997c77a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java @@ -206,28 +206,32 @@ public abstract class BlockBasedSource<T> extends FileBasedSource<T> { } @Override + @Nullable public Double getFractionConsumed() { - if (getCurrentSource().getEndOffset() == Long.MAX_VALUE) { - return null; - } - Block<T> currentBlock = getCurrentBlock(); - if (currentBlock == null) { - // There is no current block (i.e., the read has not yet begun). + if (!isStarted()) { return 0.0; } + if (isDone()) { + return 1.0; + } + FileBasedSource<T> source = getCurrentSource(); + if (source.getEndOffset() == Long.MAX_VALUE) { + // Unknown end offset, so we cannot tell. + return null; + } + long currentBlockOffset = getCurrentBlockOffset(); - long startOffset = getCurrentSource().getStartOffset(); - long endOffset = getCurrentSource().getEndOffset(); + long startOffset = source.getStartOffset(); + long endOffset = source.getEndOffset(); double fractionAtBlockStart = ((double) (currentBlockOffset - startOffset)) / (endOffset - startOffset); double fractionAtBlockEnd = ((double) (currentBlockOffset + getCurrentBlockSize() - startOffset) / (endOffset - startOffset)); + double blockFraction = getCurrentBlock().getFractionOfBlockConsumed(); return Math.min( 1.0, - fractionAtBlockStart - + currentBlock.getFractionOfBlockConsumed() - * (fractionAtBlockEnd - fractionAtBlockStart)); + fractionAtBlockStart + blockFraction * (fractionAtBlockEnd - fractionAtBlockStart)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java index 8f7d3fd..394afa4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.io; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.io.range.OffsetRangeTracker; +import org.apache.beam.sdk.io.range.RangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -27,6 +29,8 @@ import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; +import javax.annotation.Nullable; + /** * A {@link Source} that reads a finite amount of input and, because of that, supports * some additional operations. @@ -37,9 +41,16 @@ import java.util.NoSuchElementException; * <li>Size estimation: {@link #getEstimatedSizeBytes}; * <li>Telling whether or not this source produces key/value pairs in sorted order: * {@link #producesSortedKeys}; - * <li>The reader ({@link BoundedReader}) supports progress estimation - * ({@link BoundedReader#getFractionConsumed}) and dynamic splitting - * ({@link BoundedReader#splitAtFraction}). + * <li>The accompanying {@link BoundedReader reader} has additional functionality to enable runners + * to dynamically adapt based on runtime conditions. + * <ul> + * <li>Progress estimation ({@link BoundedReader#getFractionConsumed}) + * <li>Tracking of parallelism, to determine whether the current source can be split + * ({@link BoundedReader#getSplitPointsConsumed()} and + * {@link BoundedReader#getSplitPointsRemaining()}). + * <li>Dynamic splitting of the current source ({@link BoundedReader#splitAtFraction}). + * </ul> + * </li> * </ul> * * <p>To use this class for supporting your custom input type, derive your class @@ -82,14 +93,14 @@ public abstract class BoundedSource<T> extends Source<T> { * * <h3>Thread safety</h3> * All methods will be run from the same thread except {@link #splitAtFraction}, - * {@link #getFractionConsumed} and {@link #getCurrentSource}, which can be called concurrently + * {@link #getFractionConsumed}, {@link #getCurrentSource}, {@link #getSplitPointsConsumed()}, + * and {@link #getSplitPointsRemaining()}, all of which can be called concurrently * from a different thread. There will not be multiple concurrent calls to - * {@link #splitAtFraction} but there can be for {@link #getFractionConsumed} if - * {@link #splitAtFraction} is implemented. + * {@link #splitAtFraction}. * - * <p>If the source does not implement {@link #splitAtFraction}, you do not need to worry about - * thread safety. If implemented, it must be safe to call {@link #splitAtFraction} and - * {@link #getFractionConsumed} concurrently with other methods. + * <p>It must be safe to call {@link #splitAtFraction}, {@link #getFractionConsumed}, + * {@link #getCurrentSource}, {@link #getSplitPointsConsumed()}, and + * {@link #getSplitPointsRemaining()} concurrently with other methods. * * <p>Additionally, a successful {@link #splitAtFraction} call must, by definition, cause * {@link #getCurrentSource} to start returning a different value. @@ -129,11 +140,126 @@ public abstract class BoundedSource<T> extends Source<T> { * methods (including itself), and it is therefore critical for it to be implemented * in a thread-safe way. */ + @Nullable public Double getFractionConsumed() { return null; } /** + * A constant to use as the return value for {@link #getSplitPointsConsumed()} or + * {@link #getSplitPointsRemaining()} when the exact value is unknown. + */ + public static final long SPLIT_POINTS_UNKNOWN = -1; + + /** + * Returns the total amount of parallelism in the consumed (returned and processed) range of + * this reader's current {@link BoundedSource} (as would be returned by + * {@link #getCurrentSource}). This corresponds to all split point records (see + * {@link RangeTracker}) returned by this reader, <em>excluding</em> the last split point + * returned if the reader is not finished. + * + * <p>Consider the following examples: (1) An input that can be read in parallel down to the + * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable". + * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has + * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" + * input such as a cursor in a database. + * + * <ul> + * <li>Any {@link BoundedReader reader} that is unstarted (aka, has never had a call to + * {@link #start}) has a consumed parallelism of 0. This condition holds independent of whether + * the input is splittable. + * <li>Any {@link BoundedReader reader} that has only returned its first element (aka, + * has never had a call to {@link #advance}) has a consumed parallelism of 0: the first element + * is the current element and is still being processed. This condition holds independent of + * whether the input is splittable. + * <li>For an empty reader (in which the call to {@link #start} returned false), the + * consumed parallelism is 0. This condition holds independent of whether the input is + * splittable. + * <li>For a non-empty, finished reader (in which the call to {@link #start} returned true and + * a call to {@link #advance} has returned false), the value returned must be at least 1 + * and should equal the total parallelism in the source. + * <li>For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly + * splittable 50-record input, this value should be 29. When finished, the consumed parallelism + * should be 50. + * <li>For example (2): In a block-compressed value consisting of 5 blocks, the value should + * stay at 0 until the first record of the second block is returned; stay at 1 until the first + * record of the third block is returned, etc. Only once the end-of-file is reached then the + * fifth block has been consumed and the value should stay at 5. + * <li>For example (3): For any non-empty unsplittable input, the consumed parallelism is 0 + * until the reader is finished (because the last call to {@link #advance} returned false, at + * which point it becomes 1. + * </ul> + * + * <p>A reader that is implemented using a {@link RangeTracker} is encouraged to use the + * range tracker's ability to count split points to implement this method. See + * {@link OffsetBasedSource.OffsetBasedReader} and {@link OffsetRangeTracker} for an example. + * + * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted + * as unknown. + * + * <h3>Thread safety</h3> + * See the javadoc on {@link BoundedReader} for information about thread safety. + * + * @see #getSplitPointsRemaining() + */ + public long getSplitPointsConsumed() { + return SPLIT_POINTS_UNKNOWN; + } + + /** + * Returns the total amount of parallelism in the unprocessed part of this reader's current + * {@link BoundedSource} (as would be returned by {@link #getCurrentSource}). This corresponds + * to all unprocessed split point records (see {@link RangeTracker}), including the last + * split point returned, in the remainder part of the source. + * + * <p>This function should be implemented only <strong>in addition to + * {@link #getSplitPointsConsumed()}</strong> and only if <em>an exact value can be + * returned</em>. + * + * <p>Consider the following examples: (1) An input that can be read in parallel down to the + * individual records, such as {@link CountingSource#upTo}, is called "perfectly splittable". + * (2) a "block-compressed" file format such as {@link AvroIO}, in which a block of records has + * to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" + * input such as a cursor in a database. + * + * <p>Assume for examples (1) and (2) that the number of records or blocks remaining is known: + * + * <ul> + * <li>Any {@link BoundedReader reader} for which the last call to {@link #start} or + * {@link #advance} has returned true should should not return 0, because this reader itself + * represents parallelism at least 1. This condition holds independent of whether the input is + * splittable. + * <li>A finished reader (for which {@link #start} or {@link #advance}) has returned false + * should return a value of 0. This condition holds independent of whether the input is + * splittable. + * <li>For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly + * splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total + * number of records is known. + * <li>For example 2: After returning a record in block 3 in a block-compressed file + * consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in + * parallel by new readers produced via dynamic work rebalancing, while the current reader + * continues processing block 3) if the total number of blocks is known. + * <li>For example (3): a reader for any non-empty unsplittable input, should return 1 until + * it is finished, at which point it should return 0. + * <li>For any reader: After returning the last split point in a file (e.g., the last record + * in example (1), the first record in the last block for example (2), or the first record in + * the file for example (3), this value should be 1: apart from the current task, no additional + * remainder can be split off. + * </ul> + * + * <p>Defaults to {@link #SPLIT_POINTS_UNKNOWN}. Any value less than 0 will be interpreted as + * unknown. + * + * <h3>Thread safety</h3> + * See the javadoc on {@link BoundedReader} for information about thread safety. + * + * @see #getSplitPointsConsumed() + */ + public long getSplitPointsRemaining() { + return SPLIT_POINTS_UNKNOWN; + } + + /** * Returns a {@code Source} describing the same input that this {@code Reader} currently reads * (including items already read). * @@ -263,6 +389,7 @@ public abstract class BoundedSource<T> extends Source<T> { * * <p>By default, returns null to indicate that splitting is not possible. */ + @Nullable public BoundedSource<T> splitAtFraction(double fraction) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 5cb0684..8bccf5f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -32,11 +32,14 @@ import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; import java.io.IOException; import java.io.PushbackInputStream; import java.io.Serializable; +import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.NoSuchElementException; import java.util.zip.GZIPInputStream; +import javax.annotation.concurrent.GuardedBy; + /** * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate * {@link FileBasedSource} that is able to read the decompressed file format. @@ -361,7 +364,12 @@ public class CompressedSource<T> extends FileBasedSource<T> { private final FileBasedReader<T> readerDelegate; private final CompressedSource<T> source; + private final boolean splittable; + private final Object progressLock = new Object(); + @GuardedBy("progressLock") private int numRecordsRead; + @GuardedBy("progressLock") + private CountingChannel channel; /** * Create a {@code CompressedReader} from a {@code CompressedSource} and delegate reader. @@ -369,6 +377,13 @@ public class CompressedSource<T> extends FileBasedSource<T> { public CompressedReader(CompressedSource<T> source, FileBasedReader<T> readerDelegate) { super(source); this.source = source; + boolean splittable; + try { + splittable = source.isSplittable(); + } catch (Exception e) { + throw new RuntimeException("Unable to tell whether source " + source + " is splittable", e); + } + this.splittable = splittable; this.readerDelegate = readerDelegate; } @@ -380,18 +395,78 @@ public class CompressedSource<T> extends FileBasedSource<T> { return readerDelegate.getCurrent(); } + @Override + public final long getSplitPointsConsumed() { + if (splittable) { + return readerDelegate.getSplitPointsConsumed(); + } else { + synchronized (progressLock) { + return (isDone() && numRecordsRead > 0) ? 1 : 0; + } + } + } + + @Override + public final long getSplitPointsRemaining() { + if (splittable) { + return readerDelegate.getSplitPointsRemaining(); + } else { + return isDone() ? 0 : 1; + } + } + /** * Returns true only for the first record; compressed sources cannot be split. */ @Override protected final boolean isAtSplitPoint() { - // We have to return true for the first record, but not for the state before reading it, - // and not for the state after reading any other record. Hence == rather than >= or <=. - // This is required because FileBasedReader is intended for readers that can read a range - // of offsets in a file and where the range can be split in parts. CompressedReader, - // however, is a degenerate case because it cannot be split, but it has to satisfy the - // semantics of offsets and split points anyway. - return numRecordsRead == 1; + if (splittable) { + return readerDelegate.isAtSplitPoint(); + } else { + // We have to return true for the first record, but not for the state before reading it, + // and not for the state after reading any other record. Hence == rather than >= or <=. + // This is required because FileBasedReader is intended for readers that can read a range + // of offsets in a file and where the range can be split in parts. CompressedReader, + // however, is a degenerate case because it cannot be split, but it has to satisfy the + // semantics of offsets and split points anyway. + synchronized (progressLock) { + return numRecordsRead == 1; + } + } + } + + private static class CountingChannel implements ReadableByteChannel { + long count; + private final ReadableByteChannel inner; + + public CountingChannel(ReadableByteChannel inner, long count) { + this.inner = inner; + this.count = count; + } + + public long getCount() { + return count; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int bytes = inner.read(dst); + if (bytes > 0) { + // Avoid the -1 from EOF. + count += bytes; + } + return bytes; + } + + @Override + public boolean isOpen() { + return inner.isOpen(); + } + + @Override + public void close() throws IOException { + inner.close(); + } } /** @@ -400,6 +475,16 @@ public class CompressedSource<T> extends FileBasedSource<T> { */ @Override protected final void startReading(ReadableByteChannel channel) throws IOException { + if (splittable) { + // No-op. We will always delegate to the inner reader, so this.channel and this.progressLock + // will never be used. + } else { + synchronized (progressLock) { + this.channel = new CountingChannel(channel, getCurrentSource().getStartOffset()); + channel = this.channel; + } + } + if (source.getChannelFactory() instanceof FileNameBasedDecompressingChannelFactory) { FileNameBasedDecompressingChannelFactory channelFactory = (FileNameBasedDecompressingChannelFactory) source.getChannelFactory(); @@ -420,16 +505,37 @@ public class CompressedSource<T> extends FileBasedSource<T> { if (!readerDelegate.readNextRecord()) { return false; } - ++numRecordsRead; + synchronized (progressLock) { + ++numRecordsRead; + } return true; } - /** - * Returns the delegate reader's current offset in the decompressed input. - */ + // Splittable: simply delegates to the inner reader. + // + // Unsplittable: returns the offset in the input stream that has been read by the input. + // these positions are likely to be coarse-grained (in the event of buffering) and + // over-estimates (because they reflect the number of bytes read to produce an element, not its + // start) but both of these provide better data than e.g., reporting the start of the file. @Override - protected final long getCurrentOffset() { - return readerDelegate.getCurrentOffset(); + protected final long getCurrentOffset() throws NoSuchElementException { + if (splittable) { + return readerDelegate.getCurrentOffset(); + } else { + synchronized (progressLock) { + if (numRecordsRead <= 1) { + // Since the first record is at a split point, it should start at the beginning of the + // file. This avoids the bad case where the decompressor read the entire file, which + // would cause the file to be treated as empty when returning channel.getCount() as it + // is outside the valid range. + return 0; + } + if (channel == null) { + throw new NoSuchElementException(); + } + return channel.getCount(); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index b28e866..403d22e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -210,6 +210,11 @@ public class CountingSource { } @Override + public synchronized long getSplitPointsRemaining() { + return Math.max(0, getCurrentSource().getEndOffset() - current); + } + + @Override public synchronized BoundedCountingSource getCurrentSource() { return (BoundedCountingSource) super.getCurrentSource(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java index cc8e923..137c6cd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java @@ -865,6 +865,8 @@ public class DatastoreIO { */ private int userLimit; + private volatile boolean done = false; + private Entity currentEntity; /** @@ -885,6 +887,16 @@ public class DatastoreIO { } @Override + public final long getSplitPointsConsumed() { + return done ? 1 : 0; + } + + @Override + public final long getSplitPointsRemaining() { + return done ? 0 : 1; + } + + @Override public boolean start() throws IOException { return advance(); } @@ -901,6 +913,7 @@ public class DatastoreIO { if (entities == null || !entities.hasNext()) { currentEntity = null; + done = true; return false; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 96aeda5..f000f6a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -489,7 +489,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { } @Override - public FileBasedSource<T> getCurrentSource() { + public synchronized FileBasedSource<T> getCurrentSource() { return (FileBasedSource<T>) super.getCurrentSource(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 9ee89a2..2f62acd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -180,7 +180,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { * * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to - * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine + * indicate that a file should be read to the end. Then this function should determine * the actual, exact size of the file in bytes and return it. */ public abstract long getMaxEndOffset(PipelineOptions options) throws Exception; @@ -230,9 +230,22 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { */ public abstract static class OffsetBasedReader<T> extends BoundedReader<T> { private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class); - private OffsetBasedSource<T> source; + /** + * Returns true if the last call to {@link #start} or {@link #advance} returned false. + */ + public final boolean isDone() { + return rangeTracker.isDone(); + } + + /** + * Returns true if there has been a call to {@link #start}. + */ + public final boolean isStarted() { + return rangeTracker.isStarted(); + } + /** The {@link OffsetRangeTracker} managing the range and current position of the source. */ private final OffsetRangeTracker rangeTracker; @@ -266,12 +279,14 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { @Override public final boolean start() throws IOException { - return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()); + return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()) + || rangeTracker.markDone(); } @Override public final boolean advance() throws IOException { - return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()); + return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset()) + || rangeTracker.markDone(); } /** @@ -315,6 +330,32 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { } @Override + public long getSplitPointsConsumed() { + return rangeTracker.getSplitPointsProcessed(); + } + + @Override + public long getSplitPointsRemaining() { + if (isDone()) { + return 0; + } else if (!isStarted()) { + // Note that even if the current source does not allow splitting, we don't know that + // it's non-empty so we return UNKNOWN instead of 1. + return BoundedReader.SPLIT_POINTS_UNKNOWN; + } else if (!getCurrentSource().allowsDynamicSplitting()) { + // Started (so non-empty) and unsplittable, so only the current task. + return 1; + } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) { + // If this is true, the next element is outside the range. Note that even getCurrentOffset() + // might be larger than the stop position when the current record is not a split point. + return 1; + } else { + // Use the default. + return super.getSplitPointsRemaining(); + } + } + + @Override public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) { if (!getCurrentSource().allowsDynamicSplitting()) { return null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 79eeb08..13cb45e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -817,9 +817,10 @@ public class TextIO { private ByteString buffer; private int startOfSeparatorInBuffer; private int endOfSeparatorInBuffer; - private long startOfNextRecord; - private boolean eof; - private boolean elementIsPresent; + private long startOfRecord; + private volatile long startOfNextRecord; + private volatile boolean eof; + private volatile boolean elementIsPresent; private T currentValue; private ReadableByteChannel inChannel; @@ -834,7 +835,15 @@ public class TextIO { if (!elementIsPresent) { throw new NoSuchElementException(); } - return startOfNextRecord; + return startOfRecord; + } + + @Override + public long getSplitPointsRemaining() { + if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) { + return isDone() ? 0 : 1; + } + return super.getSplitPointsRemaining(); } @Override @@ -912,7 +921,7 @@ public class TextIO { @Override protected boolean readNextRecord() throws IOException { - startOfNextRecord += endOfSeparatorInBuffer; + startOfRecord = startOfNextRecord; findSeparatorBounds(); // If we have reached EOF file and consumed all of the buffer then we know @@ -923,6 +932,7 @@ public class TextIO { } decodeCurrentElement(); + startOfNextRecord = startOfRecord + endOfSeparatorInBuffer; return true; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java index ea1cf14..76790af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java @@ -17,6 +17,10 @@ */ package org.apache.beam.sdk.io.range; +import static com.google.common.base.Preconditions.checkState; + +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -32,6 +36,8 @@ public class OffsetRangeTracker implements RangeTracker<Long> { private long stopOffset; private long lastRecordStart = -1L; private long offsetOfLastSplitPoint = -1L; + private long splitPointsSeen = 0L; + private boolean done = false; /** * Offset corresponding to infinity. This can only be used as the upper-bound of a range, and @@ -49,6 +55,15 @@ public class OffsetRangeTracker implements RangeTracker<Long> { this.stopOffset = stopOffset; } + public synchronized boolean isStarted() { + // done => started: handles the case when the reader was empty. + return (offsetOfLastSplitPoint != -1) || done; + } + + public synchronized boolean isDone() { + return done; + } + @Override public synchronized Long getStartPosition() { return startOffset; @@ -65,10 +80,18 @@ public class OffsetRangeTracker implements RangeTracker<Long> { } public synchronized boolean tryReturnRecordAt(boolean isAtSplitPoint, long recordStart) { - if (lastRecordStart == -1 && !isAtSplitPoint) { + if (!isStarted() && !isAtSplitPoint) { throw new IllegalStateException( String.format("The first record [starting at %d] must be at a split point", recordStart)); } + if (recordStart < startOffset) { + throw new IllegalStateException( + String.format( + "Trying to return record [starting at %d] which is before the start offset [%d]", + recordStart, + startOffset)); + + } if (recordStart < lastRecordStart) { throw new IllegalStateException( String.format( @@ -77,8 +100,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> { recordStart, lastRecordStart)); } + + lastRecordStart = recordStart; + if (isAtSplitPoint) { - if (offsetOfLastSplitPoint != -1L && recordStart == offsetOfLastSplitPoint) { + if (recordStart == offsetOfLastSplitPoint) { throw new IllegalStateException( String.format( "Record at a split point has same offset as the previous split point: " @@ -86,12 +112,13 @@ public class OffsetRangeTracker implements RangeTracker<Long> { offsetOfLastSplitPoint, recordStart)); } if (recordStart >= stopOffset) { + done = true; return false; } offsetOfLastSplitPoint = recordStart; + ++splitPointsSeen; } - lastRecordStart = recordStart; return true; } @@ -105,7 +132,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> { LOG.debug("Refusing to split {} at {}: stop position unspecified", this, splitOffset); return false; } - if (lastRecordStart == -1) { + if (!isStarted()) { LOG.debug("Refusing to split {} at {}: unstarted", this, splitOffset); return false; } @@ -143,17 +170,72 @@ public class OffsetRangeTracker implements RangeTracker<Long> { @Override public synchronized double getFractionConsumed() { - if (stopOffset == OFFSET_INFINITY) { + if (!isStarted()) { return 0.0; - } - if (lastRecordStart == -1) { + } else if (isDone()) { + return 1.0; + } else if (stopOffset == OFFSET_INFINITY) { return 0.0; + } else if (lastRecordStart >= stopOffset) { + return 1.0; + } else { + // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5 + // which is (4 - 3 + 1) / (6 - 3) = 67%. + // Also, clamp to at most 1.0 because the last consumed position can extend past the + // stop position. + return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset)); } - // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5 - // which is (4 - 3 + 1) / (6 - 3) = 67%. - // Also, clamp to at most 1.0 because the last consumed position can extend past the - // stop position. - return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset)); + } + + /** + * Returns the total number of split points that have been processed. + * + * <p>A split point at a particular offset has been seen if there has been a corresponding call + * to {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true. It has been + * processed if there has been a <em>subsequent</em> call to + * {@link #tryReturnRecordAt(boolean, long)} with {@code isAtSplitPoint} true and at a larger + * offset. + * + * <p>Note that for correctness when implementing {@link BoundedReader#getSplitPointsConsumed()}, + * if a reader finishes before {@link #tryReturnRecordAt(boolean, long)} returns false, + * the reader should add an additional call to {@link #markDone()}. This will indicate that + * processing for the last seen split point has been finished. + * + * @see org.apache.beam.sdk.io.OffsetBasedSource for a {@link BoundedReader} + * implemented using {@link OffsetRangeTracker}. + */ + public synchronized long getSplitPointsProcessed() { + if (!isStarted()) { + return 0; + } else if (isDone()) { + return splitPointsSeen; + } else { + // There is a current split point, and it has not finished processing. + checkState( + splitPointsSeen > 0, + "A started rangeTracker should have seen > 0 split points (is %s)", + splitPointsSeen); + return splitPointsSeen - 1; + } + } + + + /** + * Marks this range tracker as being done. Specifically, this will mark the current split point, + * if one exists, as being finished. + * + * <p>Always returns false, so that it can be used in an implementation of + * {@link BoundedReader#start()} or {@link BoundedReader#advance()} as follows: + * + * <pre> {@code + * public boolean start() { + * return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint, position) + * || rangeTracker.markDone(); + * }} </pre> + */ + public synchronized boolean markDone() { + done = true; + return false; } @Override @@ -177,7 +259,10 @@ public class OffsetRangeTracker implements RangeTracker<Long> { @VisibleForTesting OffsetRangeTracker copy() { OffsetRangeTracker res = new OffsetRangeTracker(startOffset, stopOffset); + res.offsetOfLastSplitPoint = this.offsetOfLastSplitPoint; res.lastRecordStart = this.lastRecordStart; + res.done = this.done; + res.splitPointsSeen = this.splitPointsSeen; return res; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index 20c21bc..13f8e7f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -28,6 +28,8 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroSource.AvroReader; import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker; +import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; @@ -44,6 +46,7 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.AvroDefault; import org.apache.avro.reflect.Nullable; import org.apache.avro.reflect.ReflectData; +import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -57,6 +60,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PushbackInputStream; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -198,6 +202,86 @@ public class AvroSourceTest { } @Test + public void testProgress() throws Exception { + // 5 records, 2 per block. + List<FixedRecord> records = createFixedRecords(5); + String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2, + AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC); + + AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class); + try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) { + assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class)); + BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig; + + // Before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // First 2 records are in the same block. + assertTrue(reader.start()); + assertTrue(reader.isAtSplitPoint()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + // continued + assertTrue(reader.advance()); + assertFalse(reader.isAtSplitPoint()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Second block -> parallelism consumed becomes 1. + assertTrue(reader.advance()); + assertTrue(reader.isAtSplitPoint()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + // continued + assertTrue(reader.advance()); + assertFalse(reader.isAtSplitPoint()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Third and final block -> parallelism consumed becomes 2, remaining becomes 1. + assertTrue(reader.advance()); + assertTrue(reader.isAtSplitPoint()); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Done + assertFalse(reader.advance()); + assertEquals(3, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + } + } + + @Test + public void testProgressEmptySource() throws Exception { + // 0 records, 20 per block. + List<FixedRecord> records = Collections.emptyList(); + String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_REGULAR, 2, + AvroCoder.of(FixedRecord.class), DataFileConstants.NULL_CODEC); + + AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class); + try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) { + assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class)); + BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig; + + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // confirm empty + assertFalse(reader.start()); + + // after reading empty source + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + } + } + + @Test public void testGetCurrentFromUnstartedReader() throws Exception { List<FixedRecord> records = createFixedRecords(DEFAULT_RECORD_COUNT); String filename = generateTestFile("tmp.avro", records, SyncBehavior.SYNC_DEFAULT, 1000, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 542e734..7161c1d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -19,9 +19,10 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom; - import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -29,8 +30,11 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.CompressedSource.CompressedReader; import org.apache.beam.sdk.io.CompressedSource.CompressionMode; import org.apache.beam.sdk.io.CompressedSource.DecompressingChannelFactory; +import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -462,11 +466,12 @@ public class CompressedSourceTest { private static class ByteReader extends FileBasedReader<Byte> { ByteBuffer buff = ByteBuffer.allocate(1); Byte current; - long offset = -1; + long offset; ReadableByteChannel channel; public ByteReader(ByteSource source) { super(source); + offset = source.getStartOffset() - 1; } @Override @@ -501,4 +506,102 @@ public class CompressedSourceTest { } } } + + @Test + public void testEmptyGzipProgress() throws IOException { + File tmpFile = tmpFolder.newFile("empty.gz"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[0], CompressionMode.GZIP); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); + try (BoundedReader<Byte> readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // confirm empty + assertFalse(reader.start()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testGzipProgress() throws IOException { + int numRecords = 3; + File tmpFile = tmpFolder.newFile("nonempty.gz"); + String filename = tmpFile.toPath().toString(); + writeFile(tmpFile, new byte[numRecords], CompressionMode.GZIP); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); + try (BoundedReader<Byte> readerOrig = source.createReader(options)) { + assertThat(readerOrig, instanceOf(CompressedReader.class)); + CompressedReader<Byte> reader = (CompressedReader<Byte>) readerOrig; + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // confirm has three records + for (int i = 0; i < numRecords; ++i) { + if (i == 0) { + assertTrue(reader.start()); + } else { + assertTrue(reader.advance()); + } + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + } + assertFalse(reader.advance()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testSplittableProgress() throws IOException { + File tmpFile = tmpFolder.newFile("nonempty.txt"); + String filename = tmpFile.toPath().toString(); + Files.write(new byte[2], tmpFile); + + PipelineOptions options = PipelineOptionsFactory.create(); + CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filename, 1)); + try (BoundedReader<Byte> readerOrig = source.createReader(options)) { + assertThat(readerOrig, not(instanceOf(CompressedReader.class))); + assertThat(readerOrig, instanceOf(FileBasedReader.class)); + FileBasedReader<Byte> reader = (FileBasedReader<Byte>) readerOrig; + + // Check preconditions before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // First record: none consumed, unknown remaining. + assertTrue(reader.start()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Second record: 1 consumed, know that we're on the last record. + assertTrue(reader.advance()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Confirm empty and check post-conditions + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index a261fb2..bf68d41 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -24,9 +24,11 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.CountingSource.CounterMark; import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -49,6 +51,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.IOException; import java.util.List; /** @@ -116,6 +119,33 @@ public class CountingSourceTest { } @Test + public void testProgress() throws IOException { + final int numRecords = 5; + @SuppressWarnings("deprecation") // testing CountingSource + BoundedSource<Long> source = CountingSource.upTo(numRecords); + try (BoundedReader<Long> reader = source.createReader(PipelineOptionsFactory.create())) { + // Check preconditions before starting. Note that CountingReader can always give an accurate + // remaining parallelism. + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(numRecords, reader.getSplitPointsRemaining()); + + assertTrue(reader.start()); + int i = 0; + do { + assertEquals(i, reader.getSplitPointsConsumed()); + assertEquals(numRecords - i, reader.getSplitPointsRemaining()); + ++i; + } while (reader.advance()); + + assertEquals(numRecords, i); // exactly numRecords calls to advance() + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(numRecords, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test @Category(RunnableOnService.class) public void testUnboundedSource() { Pipeline p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index bedbc99..1f16d39 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -446,7 +446,7 @@ public class FileBasedSourceTest { assertTrue(fractionConsumed > lastFractionConsumed); lastFractionConsumed = fractionConsumed; } - assertTrue(reader.getFractionConsumed() < 1.0); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java index e9b61aa..66abd33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -28,6 +27,8 @@ import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -86,13 +87,12 @@ public class OffsetBasedSourceTest { } @Override - public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException { + public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException { return new CoarseRangeReader(this); } } - private static class CoarseRangeReader - extends OffsetBasedSource.OffsetBasedReader<Integer> { + private static class CoarseRangeReader extends OffsetBasedReader<Integer> { private long current = -1; private long granularity; @@ -239,6 +239,69 @@ public class OffsetBasedSourceTest { } @Test + public void testProgress() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 2); + try (OffsetBasedReader<Integer> reader = source.createReader(options)) { + // Unstarted reader + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Start and produce the element 14 since granularity is 2. + assertTrue(reader.start()); + assertTrue(reader.isAtSplitPoint()); + assertEquals(14, reader.getCurrent().intValue()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + // Advance and produce the element 15, not a split point. + assertTrue(reader.advance()); + assertEquals(15, reader.getCurrent().intValue()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Advance and produce the element 16, is a split point. Since the next offset (17) is + // outside the range [13, 17), remaining parallelism should become 1 from UNKNOWN. + assertTrue(reader.advance()); + assertTrue(reader.isAtSplitPoint()); + assertEquals(16, reader.getCurrent().intValue()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); // The next offset is outside the range. + // Advance and produce the element 17, not a split point. + assertTrue(reader.advance()); + assertEquals(17, reader.getCurrent().intValue()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Advance and reach the end of the reader. + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testProgressEmptySource() throws IOException { + PipelineOptions options = PipelineOptionsFactory.create(); + CoarseRangeSource source = new CoarseRangeSource(13, 17, 1, 100); + try (OffsetBasedReader<Integer> reader = source.createReader(options)) { + // before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // confirm empty + assertFalse(reader.start()); + + // after reading empty source + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test public void testSplitAtFraction() throws IOException { PipelineOptions options = PipelineOptionsFactory.create(); CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 4d6d8dd..53a2a89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,10 +22,10 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.TextIO.CompressionType; import org.apache.beam.sdk.io.TextIO.TextSource; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -423,6 +424,117 @@ public class TextIOTest { } @Test + public void testProgressEmptyFile() throws IOException { + try (BoundedReader<String> reader = + prepareSource(new byte[0]).createReader(PipelineOptionsFactory.create())) { + // Check preconditions before starting. + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Assert empty + assertFalse(reader.start()); + + // Check postconditions after finishing + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testProgressTextFile() throws IOException { + String file = "line1\nline2\nline3"; + try (BoundedReader<String> reader = + prepareSource(file.getBytes()).createReader(PipelineOptionsFactory.create())) { + // Check preconditions before starting + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 1 + assertTrue(reader.start()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 2 + assertTrue(reader.advance()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Line 3 + assertTrue(reader.advance()); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Check postconditions after finishing + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(3, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test + public void testProgressAfterSplitting() throws IOException { + String file = "line1\nline2\nline3"; + BoundedSource source = prepareSource(file.getBytes()); + BoundedSource remainder; + + // Create the remainder, verifying properties pre- and post-splitting. + try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) { + // Preconditions. + assertEquals(0.0, readerOrig.getFractionConsumed(), 1e-6); + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); + + // First record, before splitting. + assertTrue(readerOrig.start()); + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, readerOrig.getSplitPointsRemaining()); + + // Split. 0.1 is in line1, so should now be able to detect last record. + remainder = readerOrig.splitAtFraction(0.1); + System.err.println(readerOrig.getCurrentSource()); + assertNotNull(remainder); + + // First record, after splitting. + assertEquals(0, readerOrig.getSplitPointsConsumed()); + assertEquals(1, readerOrig.getSplitPointsRemaining()); + + // Finish and postconditions. + assertFalse(readerOrig.advance()); + assertEquals(1.0, readerOrig.getFractionConsumed(), 1e-6); + assertEquals(1, readerOrig.getSplitPointsConsumed()); + assertEquals(0, readerOrig.getSplitPointsRemaining()); + } + + // Check the properties of the remainder. + try (BoundedReader<String> reader = remainder.createReader(PipelineOptionsFactory.create())) { + // Preconditions. + assertEquals(0.0, reader.getFractionConsumed(), 1e-6); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // First record should be line 2. + assertTrue(reader.start()); + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + + // Second record is line 3 + assertTrue(reader.advance()); + assertEquals(1, reader.getSplitPointsConsumed()); + assertEquals(1, reader.getSplitPointsRemaining()); + + // Check postconditions after finishing + assertFalse(reader.advance()); + assertEquals(1.0, reader.getFractionConsumed(), 1e-6); + assertEquals(2, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + } + } + + @Test public void testReadEmptyLines() throws Exception { runTestReadWithData("\n\n\n".getBytes(StandardCharsets.UTF_8), ImmutableList.of("", "", "")); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java index 3de04f7..edd4c4f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java @@ -104,7 +104,6 @@ public class OffsetRangeTrackerTest { assertFalse(tracker.tryReturnRecordAt(true, 150)); assertFalse(tracker.tryReturnRecordAt(true, 151)); // Should accept non-splitpoint records starting after stop offset. - assertTrue(tracker.tryReturnRecordAt(false, 135)); assertTrue(tracker.tryReturnRecordAt(false, 152)); assertTrue(tracker.tryReturnRecordAt(false, 160)); assertFalse(tracker.tryReturnRecordAt(true, 171)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/32a6cde4/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java index ab537eb..41a271c 100644 --- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java +++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSource.java @@ -282,6 +282,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { private Configuration conf; private RecordReader<K, V> currentReader; private KV<K, V> currentPair; + private volatile boolean done = false; /** * Create a {@code HDFSFileReader} based on a file or a file pattern specification. @@ -356,6 +357,7 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { } // either no next split or all readers were empty currentPair = null; + done = true; return false; } } catch (InterruptedException e) { @@ -433,6 +435,16 @@ public class HDFSFileSource<K, V> extends BoundedSource<KV<K, V>> { } @Override + public final long getSplitPointsRemaining() { + if (done) { + return 0; + } + // This source does not currently support dynamic work rebalancing, so remaining + // parallelism is always 1. + return 1; + } + + @Override public BoundedSource<KV<K, V>> splitAtFraction(double fraction) { // Not yet supported. To implement this, the sizes of the splits should be used to // calculate the remaining splits that constitute the given fraction, then a