Update OffsetRangeTracker progress tracking and start offset
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ee0a3bf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ee0a3bf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ee0a3bf2 Branch: refs/heads/master Commit: ee0a3bf2acc0213e27d3b3d1353c27b977046577 Parents: 741ef26 Author: Ian Zhou <ianz...@google.com> Authored: Tue Jun 21 17:23:09 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Jun 21 22:29:25 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/range/OffsetRangeTracker.java | 13 +-- .../beam/sdk/io/OffsetBasedSourceTest.java | 5 +- .../sdk/io/range/OffsetRangeTrackerTest.java | 91 +++++++++++++++----- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 3 - 4 files changed, 82 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java index 76790af..a8d00ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java @@ -32,7 +32,7 @@ import org.slf4j.LoggerFactory; public class OffsetRangeTracker implements RangeTracker<Long> { private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class); - private final long startOffset; + private long startOffset; private long stopOffset; private long lastRecordStart = -1L; private long offsetOfLastSplitPoint = -1L; @@ -101,6 +101,9 @@ public class OffsetRangeTracker implements RangeTracker<Long> { lastRecordStart)); } + if (lastRecordStart == -1) { + startOffset = recordStart; + } lastRecordStart = recordStart; if (isAtSplitPoint) { @@ -165,7 +168,7 @@ public class OffsetRangeTracker implements RangeTracker<Long> { throw new IllegalArgumentException( "getPositionForFractionConsumed is not applicable to an unbounded range: " + this); } - return (long) Math.ceil(startOffset + fraction * (stopOffset - startOffset)); + return (long) Math.floor(startOffset + fraction * (stopOffset - startOffset)); } @Override @@ -179,11 +182,11 @@ public class OffsetRangeTracker implements RangeTracker<Long> { } else if (lastRecordStart >= stopOffset) { return 1.0; } else { - // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3,4 of 3,4,5 - // which is (4 - 3 + 1) / (6 - 3) = 67%. + // E.g., when reading [3, 6) and lastRecordStart is 4, that means we consumed 3 of 3,4,5 + // which is (4 - 3) / (6 - 3) = 33%. // Also, clamp to at most 1.0 because the last consumed position can extend past the // stop position. - return Math.min(1.0, 1.0 * (lastRecordStart - startOffset + 1) / (stopOffset - startOffset)); + return Math.min(1.0, 1.0 * (lastRecordStart - startOffset) / (stopOffset - startOffset)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java index 66abd33..7009023 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java @@ -218,13 +218,14 @@ public class OffsetBasedSourceTest { assertEquals(0.0, reader.getFractionConsumed(), 1e-6); assertTrue(reader.start()); - do { + items.add(reader.getCurrent()); + while (reader.advance()) { Double fraction = reader.getFractionConsumed(); assertNotNull(fraction); assertTrue(fraction.toString(), fraction > 0.0); assertTrue(fraction.toString(), fraction <= 1.0); items.add(reader.getCurrent()); - } while (reader.advance()); + } assertEquals(1.0, reader.getFractionConsumed(), 1e-6); assertEquals(20, items.size()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java index edd4c4f..0c2c639 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/range/OffsetRangeTrackerTest.java @@ -36,6 +36,17 @@ public class OffsetRangeTrackerTest { @Rule public final ExpectedException expected = ExpectedException.none(); @Test + public void testUpdateStartOffset() throws Exception { + OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); + assertEquals(100, tracker.getStartPosition().longValue()); + // Update start offset to first record returned + assertTrue(tracker.tryReturnRecordAt(true, 150)); + assertEquals(150, tracker.getStartPosition().longValue()); + assertTrue(tracker.tryReturnRecordAt(true, 180)); + assertEquals(150, tracker.getStartPosition().longValue()); + } + + @Test public void testTryReturnRecordSimpleSparse() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); assertTrue(tracker.tryReturnRecordAt(true, 110)); @@ -113,28 +124,48 @@ public class OffsetRangeTrackerTest { public void testGetPositionForFractionDense() throws Exception { // Represents positions 3, 4, 5. OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6); - // [3, 3) represents 0.0 of [3, 6) + // [3, 3) represents from [0, 1/3) fraction of [3, 6) assertEquals(3, tracker.getPositionForFractionConsumed(0.0)); - // [3, 4) represents up to 1/3 of [3, 6) - assertEquals(4, tracker.getPositionForFractionConsumed(1.0 / 6)); - assertEquals(4, tracker.getPositionForFractionConsumed(0.333)); - // [3, 5) represents up to 2/3 of [3, 6) - assertEquals(5, tracker.getPositionForFractionConsumed(0.334)); - assertEquals(5, tracker.getPositionForFractionConsumed(0.666)); - // any fraction consumed over 2/3 means the whole [3, 6) has been consumed. - assertEquals(6, tracker.getPositionForFractionConsumed(0.667)); + assertEquals(3, tracker.getPositionForFractionConsumed(1.0 / 6)); + assertEquals(3, tracker.getPositionForFractionConsumed(0.333)); + // [3, 4) represents from [0, 2/3) fraction of [3, 6) + assertEquals(4, tracker.getPositionForFractionConsumed(0.334)); + assertEquals(4, tracker.getPositionForFractionConsumed(0.666)); + // [3, 5) represents from [0, 1) fraction of [3, 6) + assertEquals(5, tracker.getPositionForFractionConsumed(0.667)); + assertEquals(5, tracker.getPositionForFractionConsumed(0.999)); + // The whole [3, 6) is consumed for fraction 1 + assertEquals(6, tracker.getPositionForFractionConsumed(1.0)); + } + + @Test + public void testGetPositionForFractionDenseUpdateStartOffset() throws Exception { + // Represents positions 3, 4, 5. + OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6); + // [3, 3) represents from [0, 1/3) fraction of [3, 6) + assertEquals(3, tracker.getPositionForFractionConsumed(0.333)); + // Update start offset to 4 + assertTrue(tracker.tryReturnRecordAt(true, 4)); + // [4, 4) represents from [0, 1/2) fraction of [4, 6) + assertEquals(4, tracker.getPositionForFractionConsumed(0.0)); + assertEquals(4, tracker.getPositionForFractionConsumed(0.499)); + // [4, 5) represents from [0, 1) fraction of [4, 6) + assertEquals(5, tracker.getPositionForFractionConsumed(0.5)); + assertEquals(5, tracker.getPositionForFractionConsumed(0.999)); + // The whole [4, 6) is consumed for fraction 1 + assertEquals(6, tracker.getPositionForFractionConsumed(1.0)); } @Test public void testGetFractionConsumedDense() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(3, 6); - assertEquals(0, tracker.getFractionConsumed(), 1e-6); + assertEquals(0.0, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(true, 3)); - assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6); + assertEquals(0.0, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(true, 4)); - assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6); + assertEquals(1.0 / 3, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(true, 5)); - assertEquals(1.0, tracker.getFractionConsumed(), 1e-6); + assertEquals(2.0 / 3, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 6)); assertEquals(1.0, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(false /* non-split-point */, 7)); @@ -145,14 +176,33 @@ public class OffsetRangeTrackerTest { @Test public void testGetFractionConsumedSparse() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); - assertEquals(0, tracker.getFractionConsumed(), 1e-6); + assertEquals(0.0, tracker.getFractionConsumed(), 1e-6); + assertTrue(tracker.tryReturnRecordAt(true, 100)); + assertEquals(0.0, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(true, 110)); - // Consumed positions through 110 = total 11 positions of 100. - assertEquals(0.11, tracker.getFractionConsumed(), 1e-6); + // Consumed positions through 109 = total 10 positions of 100. + assertEquals(0.1, tracker.getFractionConsumed(), 1e-6); + assertTrue(tracker.tryReturnRecordAt(true, 150)); + assertEquals(0.5, tracker.getFractionConsumed(), 1e-6); + assertTrue(tracker.tryReturnRecordAt(true, 195)); + assertEquals(0.95, tracker.getFractionConsumed(), 1e-6); + assertFalse(tracker.tryReturnRecordAt(true, 200)); + assertEquals(1.0, tracker.getFractionConsumed(), 1e-6); + } + + @Test + public void testGetFractionConsumedUpdateStartOffset() throws Exception { + OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); assertTrue(tracker.tryReturnRecordAt(true, 150)); - assertEquals(0.51, tracker.getFractionConsumed(), 1e-6); + assertEquals(0.0, tracker.getFractionConsumed(), 1e-6); + assertTrue(tracker.tryReturnRecordAt(true, 160)); + assertEquals(0.2, tracker.getFractionConsumed(), 1e-6); + assertTrue(tracker.tryReturnRecordAt(true, 180)); + assertEquals(0.6, tracker.getFractionConsumed(), 1e-6); assertTrue(tracker.tryReturnRecordAt(true, 195)); - assertEquals(0.96, tracker.getFractionConsumed(), 1e-6); + assertEquals(0.9, tracker.getFractionConsumed(), 1e-6); + assertFalse(tracker.tryReturnRecordAt(true, 200)); + assertEquals(1.0, tracker.getFractionConsumed(), 1e-6); } @Test @@ -172,15 +222,16 @@ public class OffsetRangeTrackerTest { @Test public void testTryReturnFirstRecordNotSplitPoint() throws Exception { + OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); expected.expect(IllegalStateException.class); - new OffsetRangeTracker(100, 200).tryReturnRecordAt(false, 120); + tracker.tryReturnRecordAt(false, 120); } @Test public void testTryReturnRecordNonMonotonic() throws Exception { OffsetRangeTracker tracker = new OffsetRangeTracker(100, 200); - expected.expect(IllegalStateException.class); tracker.tryReturnRecordAt(true, 120); + expected.expect(IllegalStateException.class); tracker.tryReturnRecordAt(true, 110); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ee0a3bf2/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index 4cb30b4..c09943b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -289,9 +289,6 @@ public class BigtableIOTest { /** * Tests dynamic work rebalancing exhaustively. - * - * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the - * {@link BigtableIO.Read} implementation. */ @Test public void testReadingSplitAtFractionExhaustive() throws Exception {