Batch doesn't use checkpoint dir so nothing to clean.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82d754cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82d754cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82d754cf

Branch: refs/heads/master
Commit: 82d754cf46abc0ec7f4fe5c9501944c6472484d7
Parents: b88e54a
Author: Sela <ans...@paypal.com>
Authored: Tue Feb 21 16:27:42 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:05 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/TestSparkRunner.java     | 46 ++++++++++----------
 1 file changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82d754cf/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index d2b5186..5d71ea5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -117,17 +117,17 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   public SparkPipelineResult run(Pipeline pipeline) {
     SparkPipelineOptions sparkOptions = 
pipeline.getOptions().as(SparkPipelineOptions.class);
     SparkPipelineResult result = null;
-    try {
-      // clear state of Aggregators, Metrics and Watermarks.
-      AggregatorsAccumulator.clear();
-      SparkMetricsContainer.clear();
-      GlobalWatermarkHolder.clear();
+    // clear state of Aggregators, Metrics and Watermarks.
+    AggregatorsAccumulator.clear();
+    SparkMetricsContainer.clear();
+    GlobalWatermarkHolder.clear();
 
-      TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
-      LOG.info("About to run test pipeline " + sparkOptions.getJobName());
+    TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
+    LOG.info("About to run test pipeline " + sparkOptions.getJobName());
 
-      // if the pipeline was executed in streaming mode, validate aggregators.
-      if (isForceStreaming) {
+    // if the pipeline was executed in streaming mode, validate aggregators.
+    if (isForceStreaming) {
+      try {
         result = delegate.run(pipeline);
         long timeout = sparkOptions.getForcedTimeout();
         result.waitUntilFinish(Duration.millis(timeout));
@@ -151,21 +151,21 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
                 "Successfully asserted pipeline %s with %d successful 
assertions.",
                 sparkOptions.getJobName(),
                 successAssertions));
-      } else {
-        // for batch test pipelines, run and block until done.
-        result = delegate.run(pipeline);
-        result.waitUntilFinish();
-        // assert via matchers.
-        assertThat(result, testPipelineOptions.getOnCreateMatcher());
-        assertThat(result, testPipelineOptions.getOnSuccessMatcher());
-      }
-    } finally {
-      try {
-        // cleanup checkpoint dir.
-        FileUtils.deleteDirectory(new File(sparkOptions.getCheckpointDir()));
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
+        } finally {
+        try {
+          // cleanup checkpoint dir.
+          FileUtils.deleteDirectory(new File(sparkOptions.getCheckpointDir()));
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
+        }
       }
+    } else {
+      // for batch test pipelines, run and block until done.
+      result = delegate.run(pipeline);
+      result.waitUntilFinish();
+      // assert via matchers.
+      assertThat(result, testPipelineOptions.getOnCreateMatcher());
+      assertThat(result, testPipelineOptions.getOnSuccessMatcher());
     }
     return result;
   }

Reply via email to