Repository: incubator-beam Updated Branches: refs/heads/master 8949ec315 -> e0cae9fb6
Move allowsDynamicSplitting to Reader, and set it in CompressedSource. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7e9a7e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7e9a7e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7e9a7e8 Branch: refs/heads/master Commit: b7e9a7e8fa467641dfe1c19fa5d5f63f4b74a6d0 Parents: 8949ec3 Author: Pei He <pe...@google.com> Authored: Fri Jun 17 18:24:06 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Jun 22 18:07:16 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 5 +++ .../apache/beam/sdk/io/OffsetBasedSource.java | 26 +++++++-------- .../beam/sdk/io/CompressedSourceTest.java | 35 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index a5c54b3..75bfc8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -396,6 +396,11 @@ public class CompressedSource<T> extends FileBasedSource<T> { } @Override + public boolean allowsDynamicSplitting() { + return splittable; + } + + @Override public final long getSplitPointsConsumed() { if (splittable) { return readerDelegate.getSplitPointsConsumed(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 2f62acd..295eab9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -192,17 +192,6 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { */ public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end); - /** - * Whether this source should allow dynamic splitting of the offset ranges. - * - * <p>True by default. Override this to return false if the source cannot - * support dynamic splitting correctly. If this returns false, - * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests. - */ - public boolean allowsDynamicSplitting() { - return true; - } - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); @@ -342,7 +331,7 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { // Note that even if the current source does not allow splitting, we don't know that // it's non-empty so we return UNKNOWN instead of 1. return BoundedReader.SPLIT_POINTS_UNKNOWN; - } else if (!getCurrentSource().allowsDynamicSplitting()) { + } else if (!allowsDynamicSplitting()) { // Started (so non-empty) and unsplittable, so only the current task. return 1; } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) { @@ -355,9 +344,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { } } + /** + * Whether this reader should allow dynamic splitting of the offset ranges. + * + * <p>True by default. Override this to return false if the reader cannot + * support dynamic splitting correctly. If this returns false, + * {@link OffsetBasedReader#splitAtFraction} will refuse all split requests. + */ + public boolean allowsDynamicSplitting() { + return true; + } + @Override public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) { - if (!getCurrentSource().allowsDynamicSplitting()) { + if (!allowsDynamicSplitting()) { return null; } if (rangeTracker.getStopPosition() == Long.MAX_VALUE) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index abf1de3..8fbed94 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -24,6 +24,8 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -44,6 +46,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.io.Files; import com.google.common.primitives.Bytes; @@ -633,6 +637,37 @@ public class CompressedSourceTest { } @Test + public void testUnsplittable() throws IOException { + String baseName = "test-input"; + File compressedFile = tmpFolder.newFile(baseName + ".gz"); + byte[] input = generateInput(10000); + writeFile(compressedFile, input, CompressionMode.GZIP); + + CompressedSource<Byte> source = + CompressedSource.from(new ByteSource(compressedFile.getPath(), 1)); + List<Byte> expected = Lists.newArrayList(); + for (byte i : input) { + expected.add(i); + } + + PipelineOptions options = PipelineOptionsFactory.create(); + BoundedReader<Byte> reader = source.createReader(options); + + List<Byte> actual = Lists.newArrayList(); + for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { + actual.add(reader.getCurrent()); + // checkpoint every 9 elements + if (actual.size() % 9 == 0) { + Double fractionConsumed = reader.getFractionConsumed(); + assertNotNull(fractionConsumed); + assertNull(reader.splitAtFraction(fractionConsumed)); + } + } + assertEquals(expected.size(), actual.size()); + assertEquals(Sets.newHashSet(expected), Sets.newHashSet(actual)); + } + + @Test public void testSplittableProgress() throws IOException { File tmpFile = tmpFolder.newFile("nonempty.txt"); String filename = tmpFile.toPath().toString();