Repository: beam Updated Branches: refs/heads/master daed01a69 -> a1a022d6b
Finalize Checkpoints before resuming from them This moves checkpoint finalization in the DirectRunner to occur before the call to createReader instead of between that call and the call to reader.start(). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ee34d94 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ee34d94 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ee34d94 Branch: refs/heads/master Commit: 0ee34d9436574adecd276759cdfec62e60ba7b66 Parents: daed01a Author: Thomas Groh <tg...@google.com> Authored: Mon Jan 23 10:25:04 2017 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Jan 23 13:05:00 2017 -0800 ---------------------------------------------------------------------- .../runners/direct/UnboundedReadEvaluatorFactory.java | 9 +++++---- .../direct/UnboundedReadEvaluatorFactoryTest.java | 13 ++++++++++++- 2 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index a4aebc9..013e929 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -164,9 +164,13 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { throws IOException { UnboundedReader<OutputT> existing = shard.getExistingReader(); if (existing == null) { + CheckpointMarkT checkpoint = shard.getCheckpoint(); + if (checkpoint != null) { + checkpoint.finalizeCheckpoint(); + } return shard .getSource() - .createReader(evaluationContext.getPipelineOptions(), shard.getCheckpoint()); + .createReader(evaluationContext.getPipelineOptions(), checkpoint); } else { return existing; } @@ -176,9 +180,6 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException { if (shard.getExistingReader() == null) { - if (shard.getCheckpoint() != null) { - shard.getCheckpoint().finalizeCheckpoint(); - } return reader.start(); } else { return shard.getExistingReader().advance(); http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 92d668e..987f927 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; @@ -414,6 +415,9 @@ public class UnboundedReadEvaluatorFactoryTest { @Override public UnboundedSource.UnboundedReader<T> createReader( PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) { + if (checkpointMark != null) { + assertThat(checkpointMark.isFinalized(), is(true)); + } return new TestUnboundedReader(elems, checkpointMark == null ? -1 : checkpointMark.index); } @@ -505,13 +509,20 @@ public class UnboundedReadEvaluatorFactoryTest { private static class TestCheckpointMark implements CheckpointMark { final int index; + private boolean finalized = false; private TestCheckpointMark(int index) { this.index = index; } @Override - public void finalizeCheckpoint() throws IOException {} + public void finalizeCheckpoint() throws IOException { + finalized = true; + } + + boolean isFinalized() { + return finalized; + } public static class Coder extends AtomicCoder<TestCheckpointMark> { @Override