Repository: beam
Updated Branches:
  refs/heads/master daed01a69 -> a1a022d6b


Finalize Checkpoints before resuming from them

This moves checkpoint finalization in the DirectRunner to occur before
the call to createReader instead of between that call and the call to
reader.start().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ee34d94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ee34d94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ee34d94

Branch: refs/heads/master
Commit: 0ee34d9436574adecd276759cdfec62e60ba7b66
Parents: daed01a
Author: Thomas Groh <tg...@google.com>
Authored: Mon Jan 23 10:25:04 2017 -0800
Committer: Dan Halperin <dhalp...@google.com>
Committed: Mon Jan 23 13:05:00 2017 -0800

----------------------------------------------------------------------
 .../runners/direct/UnboundedReadEvaluatorFactory.java  |  9 +++++----
 .../direct/UnboundedReadEvaluatorFactoryTest.java      | 13 ++++++++++++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index a4aebc9..013e929 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -164,9 +164,13 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
         throws IOException {
       UnboundedReader<OutputT> existing = shard.getExistingReader();
       if (existing == null) {
+        CheckpointMarkT checkpoint = shard.getCheckpoint();
+        if (checkpoint != null) {
+          checkpoint.finalizeCheckpoint();
+        }
         return shard
             .getSource()
-            .createReader(evaluationContext.getPipelineOptions(), 
shard.getCheckpoint());
+            .createReader(evaluationContext.getPipelineOptions(), checkpoint);
       } else {
         return existing;
       }
@@ -176,9 +180,6 @@ class UnboundedReadEvaluatorFactory implements 
TransformEvaluatorFactory {
         UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, 
CheckpointMarkT> shard)
         throws IOException {
       if (shard.getExistingReader() == null) {
-        if (shard.getCheckpoint() != null) {
-          shard.getCheckpoint().finalizeCheckpoint();
-        }
         return reader.start();
       } else {
         return shard.getExistingReader().advance();

http://git-wip-us.apache.org/repos/asf/beam/blob/0ee34d94/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 92d668e..987f927 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -414,6 +415,9 @@ public class UnboundedReadEvaluatorFactoryTest {
     @Override
     public UnboundedSource.UnboundedReader<T> createReader(
         PipelineOptions options, @Nullable TestCheckpointMark checkpointMark) {
+      if (checkpointMark != null) {
+        assertThat(checkpointMark.isFinalized(), is(true));
+      }
       return new TestUnboundedReader(elems, checkpointMark == null ? -1 : 
checkpointMark.index);
     }
 
@@ -505,13 +509,20 @@ public class UnboundedReadEvaluatorFactoryTest {
 
   private static class TestCheckpointMark implements CheckpointMark {
     final int index;
+    private boolean finalized = false;
 
     private TestCheckpointMark(int index) {
       this.index = index;
     }
 
     @Override
-    public void finalizeCheckpoint() throws IOException {}
+    public void finalizeCheckpoint() throws IOException {
+      finalized = true;
+    }
+
+    boolean isFinalized() {
+      return finalized;
+    }
 
     public static class Coder extends AtomicCoder<TestCheckpointMark> {
       @Override

Reply via email to