Batch doesn't use checkpoint dir so nothing to clean.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82d754cf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82d754cf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82d754cf Branch: refs/heads/master Commit: 82d754cf46abc0ec7f4fe5c9501944c6472484d7 Parents: b88e54a Author: Sela <ans...@paypal.com> Authored: Tue Feb 21 16:27:42 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:18:05 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/TestSparkRunner.java | 46 ++++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/82d754cf/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index d2b5186..5d71ea5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -117,17 +117,17 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { public SparkPipelineResult run(Pipeline pipeline) { SparkPipelineOptions sparkOptions = pipeline.getOptions().as(SparkPipelineOptions.class); SparkPipelineResult result = null; - try { - // clear state of Aggregators, Metrics and Watermarks. - AggregatorsAccumulator.clear(); - SparkMetricsContainer.clear(); - GlobalWatermarkHolder.clear(); + // clear state of Aggregators, Metrics and Watermarks. + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); + GlobalWatermarkHolder.clear(); - TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); - LOG.info("About to run test pipeline " + sparkOptions.getJobName()); + TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); + LOG.info("About to run test pipeline " + sparkOptions.getJobName()); - // if the pipeline was executed in streaming mode, validate aggregators. - if (isForceStreaming) { + // if the pipeline was executed in streaming mode, validate aggregators. + if (isForceStreaming) { + try { result = delegate.run(pipeline); long timeout = sparkOptions.getForcedTimeout(); result.waitUntilFinish(Duration.millis(timeout)); @@ -151,21 +151,21 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { "Successfully asserted pipeline %s with %d successful assertions.", sparkOptions.getJobName(), successAssertions)); - } else { - // for batch test pipelines, run and block until done. - result = delegate.run(pipeline); - result.waitUntilFinish(); - // assert via matchers. - assertThat(result, testPipelineOptions.getOnCreateMatcher()); - assertThat(result, testPipelineOptions.getOnSuccessMatcher()); - } - } finally { - try { - // cleanup checkpoint dir. - FileUtils.deleteDirectory(new File(sparkOptions.getCheckpointDir())); - } catch (IOException e) { - throw new RuntimeException("Failed to clear checkpoint tmp dir.", e); + } finally { + try { + // cleanup checkpoint dir. + FileUtils.deleteDirectory(new File(sparkOptions.getCheckpointDir())); + } catch (IOException e) { + throw new RuntimeException("Failed to clear checkpoint tmp dir.", e); + } } + } else { + // for batch test pipelines, run and block until done. + result = delegate.run(pipeline); + result.waitUntilFinish(); + // assert via matchers. + assertThat(result, testPipelineOptions.getOnCreateMatcher()); + assertThat(result, testPipelineOptions.getOnSuccessMatcher()); } return result; }