Watermark tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9784f204 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9784f204 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9784f204 Branch: refs/heads/master Commit: 9784f204793ab8e8ec3ec84e3c7c8d2ca4ddaf6a Parents: c18f8a2 Author: Sela <ans...@paypal.com> Authored: Sun Feb 12 18:34:30 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Mon Feb 20 11:30:15 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/ClearWatermarksRule.java | 37 ++++ .../beam/runners/spark/ReuseSparkContext.java | 46 ++++ .../beam/runners/spark/WatermarkTest.java | 212 +++++++++++++++++++ 3 files changed, 295 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java new file mode 100644 index 0000000..4c0c99a --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ClearWatermarksRule.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.junit.rules.ExternalResource; + +/** + * A rule that clears the {@link GlobalWatermarkHolder}. + */ +public class ClearWatermarksRule extends ExternalResource { + + @Override + protected void before() throws Throwable { + clearWatermarks(); + } + + public void clearWatermarks() { + GlobalWatermarkHolder.clear(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java new file mode 100644 index 0000000..027f9fd --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.junit.rules.ExternalResource; + +/** + * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) in tests. + */ +public class ReuseSparkContext extends ExternalResource { + + private final boolean reuse; + + private ReuseSparkContext(boolean reuse) { + this.reuse = reuse; + } + + public static ReuseSparkContext no() { + return new ReuseSparkContext(false); + } + + public static ReuseSparkContext yes() { + return new ReuseSparkContext(true); + } + + @Override + protected void before() throws Throwable { + System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, Boolean.toString(reuse)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9784f204/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java new file mode 100644 index 0000000..0b56403 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java @@ -0,0 +1,212 @@ +package org.apache.beam.runners.spark; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.RegexMatcher; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.spark.api.java.JavaSparkContext; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +/** + * A test suite for the propagation of watermarks in the Spark runner. + */ +public class WatermarkTest { + + @Rule + public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Rule + public ReuseSparkContext reuseContext = ReuseSparkContext.yes(); + + private static final SparkPipelineOptions options = + PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + + private static final String INSTANT_PATTERN = + "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z"; + + @Test + public void testLowHighWatermarksAdvance() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + // low == high. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(5)), + instant)); + GlobalWatermarkHolder.advance(jsc); + // low < high. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(10)), + instant.plus(Duration.millis(15)), + instant.plus(Duration.millis(100)))); + GlobalWatermarkHolder.advance(jsc); + + // assert watermarks in Broadcast. + SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1); + assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10)))); + assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15)))); + assertThat(currentWatermarks.getSynchronizedProcessingTime(), + equalTo(instant.plus(Duration.millis(100)))); + + // assert illegal watermark advance. + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + RegexMatcher.matches( + "Low watermark " + INSTANT_PATTERN + " cannot be later then high watermark " + + INSTANT_PATTERN)); + // low > high -> not allowed! + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(25)), + instant.plus(Duration.millis(20)), + instant.plus(Duration.millis(200)))); + GlobalWatermarkHolder.advance(jsc); + } + + @Test + public void testSynchronizedTimeMonotonic() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.advance(jsc); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Synchronized processing time must advance."); + // no actual advancement of watermarks - fine by Watermarks + // but not by synchronized processing time. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.advance(jsc); + } + + @Test + public void testMultiSource() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.add(2, + new SparkWatermarks( + instant.plus(Duration.millis(3)), + instant.plus(Duration.millis(6)), + instant)); + + GlobalWatermarkHolder.advance(jsc); + + // assert watermarks for source 1. + SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1); + assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5)))); + assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10)))); + + // assert watermarks for source 2. + SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2); + assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3)))); + assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6)))); + } + + @Test + public void testInDoFn() { + // because watermark advances onBatchCompleted. + Iterable<Integer> zeroBatch = Collections.emptyList(); + Iterable<Integer> firstBatch = Collections.singletonList(1); + Iterable<Integer> secondBatch = Collections.singletonList(2); + + Instant instant = new Instant(0); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(10)), + instant.plus(Duration.millis(15)), + instant.plus(options.getBatchIntervalMillis()))); + + options.setRunner(SparkRunner.class); + options.setStreaming(true); + options.setBatchIntervalMillis(500L); + Pipeline p = Pipeline.create(options); + + CreateStream.QueuedValues<Integer> queueStream = + CreateStream.fromQueue(Arrays.asList(zeroBatch, firstBatch, secondBatch)); + + p.apply(queueStream).setCoder(VarIntCoder.of()).apply(ParDo.of(new WatermarksDoFn(1))); + + p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3)); + + System.out.println(WatermarksDoFn.strings); + + // this is a hacky way to assert but it will do until triggers are supported. + assertThat( + WatermarksDoFn.strings, + containsInAnyOrder( + "element: 1 lowWatermark: 5 highWatermark: 10 processingTime: 0", + "element: 2 lowWatermark: 10 highWatermark: 15 processingTime: 1000")); + } + + private static class WatermarksDoFn extends DoFn<Integer, String> { + private final int sourceId; + + static List<String> strings = new ArrayList<>(); + + private WatermarksDoFn(int sourceId) { + this.sourceId = sourceId; + } + + @ProcessElement + public void processElement(ProcessContext c) { + if (GlobalWatermarkHolder.get() == null + || GlobalWatermarkHolder.get().getValue().get(sourceId) == null) { + // watermark not yet updated. + return; + } + SparkWatermarks sparkWatermarks = GlobalWatermarkHolder.get().getValue().get(sourceId); + Integer element = c.element(); + String output = + "element: " + element + + " lowWatermark: " + sparkWatermarks.getLowWatermark().getMillis() + + " highWatermark: " + sparkWatermarks.getHighWatermark().getMillis() + + " processingTime: " + sparkWatermarks.getSynchronizedProcessingTime().getMillis(); + strings.add(output); + } + } + +}