Test triggers, panes and watermarks via CreateStream.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96d373fe Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96d373fe Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96d373fe Branch: refs/heads/master Commit: 96d373fe954c263414e7f22ab08979f25cc83188 Parents: 9f14350 Author: Sela <ans...@paypal.com> Authored: Sat Feb 18 22:05:19 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:17:59 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/io/CreateStream.java | 125 ++++-- .../spark/GlobalWatermarkHolderTest.java | 150 ++++++++ .../beam/runners/spark/ReuseSparkContext.java | 46 --- .../runners/spark/ReuseSparkContextRule.java | 46 +++ .../beam/runners/spark/WatermarkTest.java | 231 ----------- .../translation/streaming/CreateStreamTest.java | 385 +++++++++++++++++++ .../ResumeFromCheckpointStreamingTest.java | 296 ++++++++------ 7 files changed, 860 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java index 7ebba90..2149372 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java @@ -17,57 +17,124 @@ */ package org.apache.beam.runners.spark.io; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; +import java.util.Arrays; +import java.util.Deque; +import java.util.LinkedList; +import java.util.Queue; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + /** - * Create an input stream from Queue. + * Create an input stream from Queue. For SparkRunner tests only. * - * @param <T> stream type + * @param <T> stream type. */ -public final class CreateStream<T> { +public final class CreateStream<T> extends PTransform<PBegin, PCollection<T>> { + + private final Duration batchInterval; + private final Instant initialSystemTime; + private final Queue<Iterable<T>> batches = new LinkedList<>(); + private final Deque<SparkWatermarks> times = new LinkedList<>(); + + private Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; //for test purposes. - private CreateStream() { + private CreateStream(Duration batchInterval, Instant initialSystemTime) { + this.batchInterval = batchInterval; + this.initialSystemTime = initialSystemTime; + } + + /** Set the batch interval for the stream. */ + public static <T> CreateStream<T> withBatchInterval(Duration batchInterval) { + return new CreateStream<>(batchInterval, new Instant(0)); } /** - * Define the input stream to create from queue. - * - * @param queuedValues defines the input stream - * @param <T> stream type - * @return the queue that defines the input stream + * Enqueue next micro-batch elements. + * This is backed by a {@link Queue} so stream input order would keep the population order (FIFO). */ - public static <T> QueuedValues<T> fromQueue(Iterable<Iterable<T>> queuedValues) { - return new QueuedValues<>(queuedValues); + @SafeVarargs + public final CreateStream<T> nextBatch(T... batchElements) { + // validate timestamps if timestamped elements. + for (T element: batchElements) { + if (element instanceof TimestampedValue) { + TimestampedValue timestampedValue = (TimestampedValue) element; + checkArgument( + timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Elements must have timestamps before %s. Got: %s", + BoundedWindow.TIMESTAMP_MAX_VALUE, + timestampedValue.getTimestamp()); + } + } + batches.offer(Arrays.asList(batchElements)); + return this; + } + + /** Set the initial synchronized processing time. */ + public CreateStream<T> initialSystemTimeAt(Instant initialSystemTime) { + return new CreateStream<>(batchInterval, initialSystemTime); } /** - * {@link PTransform} for queueing values. + * Advances the watermark in the next batch. */ - public static final class QueuedValues<T> extends PTransform<PBegin, PCollection<T>> { + public CreateStream<T> advanceWatermarkForNextBatch(Instant newWatermark) { + checkArgument( + !newWatermark.isBefore(lowWatermark), "The watermark is not allowed to decrease!"); + checkArgument( + newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), + "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", + newWatermark, + BoundedWindow.TIMESTAMP_MAX_VALUE); + return advance(newWatermark); + } - private final Iterable<Iterable<T>> queuedValues; + /** + * Advances the watermark in the next batch to the end-of-time. + */ + public CreateStream<T> advanceNextBatchWatermarkToInfinity() { + return advance(BoundedWindow.TIMESTAMP_MAX_VALUE); + } - QueuedValues(Iterable<Iterable<T>> queuedValues) { - checkNotNull( - queuedValues, "need to set the queuedValues of an Create.QueuedValues transform"); - this.queuedValues = queuedValues; - } + private CreateStream<T> advance(Instant newWatermark) { + // advance the system time. + Instant currentSynchronizedProcessingTime = times.peekLast() == null ? initialSystemTime + : times.peekLast().getSynchronizedProcessingTime(); + Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus(batchInterval); + checkArgument( + nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), + "Synchronized processing time must always advance."); + times.offer(new SparkWatermarks(lowWatermark, newWatermark, nextSynchronizedProcessingTime)); + lowWatermark = newWatermark; + return this; + } - public Iterable<Iterable<T>> getQueuedValues() { - return queuedValues; - } + /** Get the underlying queue representing the mock stream of micro-batches. */ + public Queue<Iterable<T>> getBatches() { + return batches; + } - @Override - public PCollection<T> expand(PBegin input) { - // Spark streaming micro batches are bounded by default - return PCollection.createPrimitiveOutputInternal(input.getPipeline(), - WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); - } + /** + * Get times so they can be pushed into the + * {@link org.apache.beam.runners.spark.util.GlobalWatermarkHolder}. + */ + public Queue<SparkWatermarks> getTimes() { + return times; + } + + @Override + public PCollection<T> expand(PBegin input) { + return PCollection.createPrimitiveOutputInternal( + input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED); } } http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java new file mode 100644 index 0000000..c1d2944 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.RegexMatcher; +import org.apache.spark.api.java.JavaSparkContext; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + + +/** + * A test suite for the propagation of watermarks in the Spark runner. + */ +public class WatermarkTest { + + @Rule + public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule(); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Rule + public ReuseSparkContextRule reuseContext = ReuseSparkContextRule.yes(); + + private static final SparkPipelineOptions options = + PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + + private static final String INSTANT_PATTERN = + "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z"; + + @Test + public void testLowHighWatermarksAdvance() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + // low == high. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(5)), + instant)); + GlobalWatermarkHolder.advance(jsc); + // low < high. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(10)), + instant.plus(Duration.millis(15)), + instant.plus(Duration.millis(100)))); + GlobalWatermarkHolder.advance(jsc); + + // assert watermarks in Broadcast. + SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1); + assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10)))); + assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15)))); + assertThat(currentWatermarks.getSynchronizedProcessingTime(), + equalTo(instant.plus(Duration.millis(100)))); + + // assert illegal watermark advance. + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + RegexMatcher.matches( + "Low watermark " + INSTANT_PATTERN + " cannot be later then high watermark " + + INSTANT_PATTERN)); + // low > high -> not allowed! + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(25)), + instant.plus(Duration.millis(20)), + instant.plus(Duration.millis(200)))); + GlobalWatermarkHolder.advance(jsc); + } + + @Test + public void testSynchronizedTimeMonotonic() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.advance(jsc); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Synchronized processing time must advance."); + // no actual advancement of watermarks - fine by Watermarks + // but not by synchronized processing time. + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.advance(jsc); + } + + @Test + public void testMultiSource() { + JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); + + Instant instant = new Instant(0); + GlobalWatermarkHolder.add(1, + new SparkWatermarks( + instant.plus(Duration.millis(5)), + instant.plus(Duration.millis(10)), + instant)); + GlobalWatermarkHolder.add(2, + new SparkWatermarks( + instant.plus(Duration.millis(3)), + instant.plus(Duration.millis(6)), + instant)); + + GlobalWatermarkHolder.advance(jsc); + + // assert watermarks for source 1. + SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1); + assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5)))); + assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10)))); + + // assert watermarks for source 2. + SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2); + assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3)))); + assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6)))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java deleted file mode 100644 index 027f9fd..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContext.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark; - -import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.junit.rules.ExternalResource; - -/** - * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) in tests. - */ -public class ReuseSparkContext extends ExternalResource { - - private final boolean reuse; - - private ReuseSparkContext(boolean reuse) { - this.reuse = reuse; - } - - public static ReuseSparkContext no() { - return new ReuseSparkContext(false); - } - - public static ReuseSparkContext yes() { - return new ReuseSparkContext(true); - } - - @Override - protected void before() throws Throwable { - System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, Boolean.toString(reuse)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java new file mode 100644 index 0000000..027f9fd --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ReuseSparkContextRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark; + +import org.apache.beam.runners.spark.translation.SparkContextFactory; +import org.junit.rules.ExternalResource; + +/** + * Explicitly set {@link org.apache.spark.SparkContext} to be reused (or not) in tests. + */ +public class ReuseSparkContext extends ExternalResource { + + private final boolean reuse; + + private ReuseSparkContext(boolean reuse) { + this.reuse = reuse; + } + + public static ReuseSparkContext no() { + return new ReuseSparkContext(false); + } + + public static ReuseSparkContext yes() { + return new ReuseSparkContext(true); + } + + @Override + protected void before() throws Throwable { + System.setProperty(SparkContextFactory.TEST_REUSE_SPARK_CONTEXT, Boolean.toString(reuse)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java deleted file mode 100644 index 0a0abf9..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.beam.runners.spark.io.CreateStream; -import org.apache.beam.runners.spark.translation.SparkContextFactory; -import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; -import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.RegexMatcher; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.spark.api.java.JavaSparkContext; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - - -/** - * A test suite for the propagation of watermarks in the Spark runner. - */ -public class WatermarkTest { - - @Rule - public ClearWatermarksRule clearWatermarksRule = new ClearWatermarksRule(); - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Rule - public ReuseSparkContext reuseContext = ReuseSparkContext.yes(); - - private static final SparkPipelineOptions options = - PipelineOptionsFactory.create().as(SparkPipelineOptions.class); - - private static final String INSTANT_PATTERN = - "[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3}Z"; - - @Test - public void testLowHighWatermarksAdvance() { - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - - Instant instant = new Instant(0); - // low == high. - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(5)), - instant.plus(Duration.millis(5)), - instant)); - GlobalWatermarkHolder.advance(jsc); - // low < high. - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(10)), - instant.plus(Duration.millis(15)), - instant.plus(Duration.millis(100)))); - GlobalWatermarkHolder.advance(jsc); - - // assert watermarks in Broadcast. - SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1); - assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10)))); - assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15)))); - assertThat(currentWatermarks.getSynchronizedProcessingTime(), - equalTo(instant.plus(Duration.millis(100)))); - - // assert illegal watermark advance. - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - RegexMatcher.matches( - "Low watermark " + INSTANT_PATTERN + " cannot be later then high watermark " - + INSTANT_PATTERN)); - // low > high -> not allowed! - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(25)), - instant.plus(Duration.millis(20)), - instant.plus(Duration.millis(200)))); - GlobalWatermarkHolder.advance(jsc); - } - - @Test - public void testSynchronizedTimeMonotonic() { - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - - Instant instant = new Instant(0); - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(5)), - instant.plus(Duration.millis(10)), - instant)); - GlobalWatermarkHolder.advance(jsc); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Synchronized processing time must advance."); - // no actual advancement of watermarks - fine by Watermarks - // but not by synchronized processing time. - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(5)), - instant.plus(Duration.millis(10)), - instant)); - GlobalWatermarkHolder.advance(jsc); - } - - @Test - public void testMultiSource() { - JavaSparkContext jsc = SparkContextFactory.getSparkContext(options); - - Instant instant = new Instant(0); - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(5)), - instant.plus(Duration.millis(10)), - instant)); - GlobalWatermarkHolder.add(2, - new SparkWatermarks( - instant.plus(Duration.millis(3)), - instant.plus(Duration.millis(6)), - instant)); - - GlobalWatermarkHolder.advance(jsc); - - // assert watermarks for source 1. - SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1); - assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5)))); - assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10)))); - - // assert watermarks for source 2. - SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2); - assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3)))); - assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6)))); - } - - @Test - @Ignore( - "BEAM-1526 - This test is flaky, and is expected to be fixed in " - + "https://github.com/apache/beam/pull/2050") - public void testInDoFn() { - // because watermark advances onBatchCompleted. - Iterable<Integer> zeroBatch = Collections.emptyList(); - Iterable<Integer> firstBatch = Collections.singletonList(1); - Iterable<Integer> secondBatch = Collections.singletonList(2); - - Instant instant = new Instant(0); - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(5)), - instant.plus(Duration.millis(10)), - instant)); - GlobalWatermarkHolder.add(1, - new SparkWatermarks( - instant.plus(Duration.millis(10)), - instant.plus(Duration.millis(15)), - instant.plus(options.getBatchIntervalMillis()))); - - options.setRunner(SparkRunner.class); - options.setStreaming(true); - options.setBatchIntervalMillis(500L); - Pipeline p = Pipeline.create(options); - - CreateStream.QueuedValues<Integer> queueStream = - CreateStream.fromQueue(Arrays.asList(zeroBatch, firstBatch, secondBatch)); - - p.apply(queueStream).setCoder(VarIntCoder.of()).apply(ParDo.of(new WatermarksDoFn(1))); - - p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3)); - - // this is a hacky way to assert but it will do until triggers are supported. - assertThat( - WatermarksDoFn.strings, - containsInAnyOrder( - "element: 1 lowWatermark: 5 highWatermark: 10 processingTime: 0", - "element: 2 lowWatermark: 10 highWatermark: 15 processingTime: 1000")); - } - - private static class WatermarksDoFn extends DoFn<Integer, String> { - private final int sourceId; - - static List<String> strings = new ArrayList<>(); - - private WatermarksDoFn(int sourceId) { - this.sourceId = sourceId; - } - - @ProcessElement - public void processElement(ProcessContext c) { - if (GlobalWatermarkHolder.get() == null - || GlobalWatermarkHolder.get().getValue().get(sourceId) == null) { - // watermark not yet updated. - return; - } - SparkWatermarks sparkWatermarks = GlobalWatermarkHolder.get().getValue().get(sourceId); - Integer element = c.element(); - String output = - "element: " + element - + " lowWatermark: " + sparkWatermarks.getLowWatermark().getMillis() - + " highWatermark: " + sparkWatermarks.getHighWatermark().getMillis() - + " processingTime: " + sparkWatermarks.getSynchronizedProcessingTime().getMillis(); - strings.add(output); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java new file mode 100644 index 0000000..0cb33ab --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.runners.spark.ReuseSparkContextRule; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.io.CreateStream; +import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; + + +/** + * A test suite to test Spark runner implementation of triggers and panes. + * + * <p>Since Spark is a micro-batch engine, and will process any test-sized input + * within the same (first) batch, it is important to make sure inputs are ingested across + * micro-batches using {@link org.apache.spark.streaming.dstream.QueueInputDStream}. + */ +public class CreateStreamTest implements Serializable { + + @Rule + public transient TemporaryFolder checkpointParentDir = new TemporaryFolder(); + @Rule + public transient SparkTestPipelineOptionsForStreaming commonOptions = + new SparkTestPipelineOptionsForStreaming(); + @Rule + public transient ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); + @Rule + public transient TestName testName = new TestName(); + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testLateDataAccumulating() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + Instant instant = new Instant(0); + CreateStream<TimestampedValue<Integer>> source = + CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6))) + .nextBatch( + TimestampedValue.of(1, instant), + TimestampedValue.of(2, instant), + TimestampedValue.of(3, instant)) + .advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20))) + // These elements are late but within the allowed lateness + .nextBatch( + TimestampedValue.of(4, instant), + TimestampedValue.of(5, instant)) + // These elements are droppably late + .advanceNextBatchWatermarkToInfinity() + .nextBatch( + TimestampedValue.of(-1, instant), + TimestampedValue.of(-2, instant), + TimestampedValue.of(-3, instant)); + + PCollection<Integer> windowed = p + .apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of())) + .apply(ParDo.of(new OnlyValue<Integer>())) + .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(2))) + .withLateFirings(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.standardMinutes(5), Window.ClosingBehavior.FIRE_ALWAYS)); + PCollection<Integer> triggered = windowed.apply(WithKeys.<Integer, Integer>of(1)) + .apply(GroupByKey.<Integer, Integer>create()) + .apply(Values.<Iterable<Integer>>create()) + .apply(Flatten.<Integer>iterables()); + PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults()); + PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults()); + + IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L))); + PAssert.that(triggered) + .inFinalPane(window) + .containsInAnyOrder(1, 2, 3, 4, 5); + PAssert.that(triggered) + .inOnTimePane(window) + .containsInAnyOrder(1, 2, 3); + PAssert.that(count) + .inWindow(window) + .satisfies(new SerializableFunction<Iterable<Long>, Void>() { + @Override + public Void apply(Iterable<Long> input) { + for (Long count : input) { + assertThat(count, allOf(greaterThanOrEqualTo(3L), lessThanOrEqualTo(5L))); + } + return null; + } + }); + PAssert.that(sum) + .inWindow(window) + .satisfies(new SerializableFunction<Iterable<Integer>, Void>() { + @Override + public Void apply(Iterable<Integer> input) { + for (Integer sum : input) { + assertThat(sum, allOf(greaterThanOrEqualTo(6), lessThanOrEqualTo(15))); + } + return null; + } + }); + + p.run(); + } + +// @Test +// @Category({NeedsRunner.class, UsesTestStream.class}) +// public void testProcessingTimeTrigger() { +// TestStream<Long> source = TestStream.create(VarLongCoder.of()) +// .addElements(TimestampedValue.of(1L, new Instant(1000L)), +// TimestampedValue.of(2L, new Instant(2000L))) +// .advanceProcessingTime(Duration.standardMinutes(12)) +// .addElements(TimestampedValue.of(3L, new Instant(3000L))) +// .advanceProcessingTime(Duration.standardMinutes(6)) +// .advanceWatermarkToInfinity(); +// +// PCollection<Long> sum = p.apply(source) +// .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow() +// .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() +// .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes() +// .withAllowedLateness(Duration.ZERO)) +// .apply(Sum.longsGlobally()); +// +// PAssert.that(sum).inEarlyGlobalWindowPanes().containsInAnyOrder(3L, 6L); +// +// p.run(); +// } + + @Test + public void testDiscardingMode() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + CreateStream<TimestampedValue<String>> source = + CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) + .nextBatch( + TimestampedValue.of("firstPane", new Instant(100)), + TimestampedValue.of("alsoFirstPane", new Instant(200))) + .advanceWatermarkForNextBatch(new Instant(1001L)) + .nextBatch( + TimestampedValue.of("onTimePane", new Instant(500))) + .advanceNextBatchWatermarkToInfinity() + .nextBatch( + TimestampedValue.of("finalLatePane", new Instant(750)), + TimestampedValue.of("alsoFinalLatePane", new Instant(250))); + + FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); + Duration allowedLateness = Duration.millis(5000L); + PCollection<String> values = + p.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) + .apply(ParDo.of(new OnlyValue<String>())) + .apply( + Window.<String>into(windowFn) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(2)) + .withLateFirings(Never.ever())) + .discardingFiredPanes() + .withAllowedLateness(allowedLateness)) + .apply(WithKeys.<Integer, String>of(1)) + .apply(GroupByKey.<Integer, String>create()) + .apply(Values.<Iterable<String>>create()) + .apply(Flatten.<String>iterables()); + + IntervalWindow window = windowFn.assignWindow(new Instant(100)); + PAssert.that(values) + .inWindow(window) + .containsInAnyOrder( + "firstPane", "alsoFirstPane", "onTimePane", "finalLatePane", "alsoFinalLatePane"); + PAssert.that(values) + .inCombinedNonLatePanes(window) + .containsInAnyOrder("firstPane", "alsoFirstPane", "onTimePane"); + PAssert.that(values).inOnTimePane(window).containsInAnyOrder("onTimePane"); + PAssert.that(values) + .inFinalPane(window) + .containsInAnyOrder("finalLatePane", "alsoFinalLatePane"); + + p.run(); + } + + @Test + public void testFirstElementLate() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + Instant lateElementTimestamp = new Instant(-1_000_000); + CreateStream<TimestampedValue<String>> source = + CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) + .nextBatch() + .advanceWatermarkForNextBatch(new Instant(-1_000_000)) + .nextBatch( + TimestampedValue.of("late", lateElementTimestamp), + TimestampedValue.of("onTime", new Instant(100))) + .advanceNextBatchWatermarkToInfinity(); + + FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); + Duration allowedLateness = Duration.millis(5000L); + PCollection<String> values = p.apply(source) + .setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) + .apply(ParDo.of(new OnlyValue<String>())) + .apply(Window.<String>into(windowFn).triggering(DefaultTrigger.of()) + .discardingFiredPanes() + .withAllowedLateness(allowedLateness)) + .apply(WithKeys.<Integer, String>of(1)) + .apply(GroupByKey.<Integer, String>create()) + .apply(Values.<Iterable<String>>create()) + .apply(Flatten.<String>iterables()); + + //TODO: empty panes do not emmit anything so Spark won't evaluate an "empty" assertion. +// PAssert.that(values).inWindow(windowFn.assignWindow(lateElementTimestamp)).empty(); + PAssert.that(values) + .inWindow(windowFn.assignWindow(new Instant(100))) + .containsInAnyOrder("onTime"); + + p.run(); + } + + @Test + public void testElementsAtAlmostPositiveInfinity() throws IOException { + SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + Pipeline p = Pipeline.create(options); + options.setJobName(testName.getMethodName()); + Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); + + Instant endOfGlobalWindow = GlobalWindow.INSTANCE.maxTimestamp(); + CreateStream<TimestampedValue<String>> source = + CreateStream.<TimestampedValue<String>>withBatchInterval(batchDuration) + .nextBatch( + TimestampedValue.of("foo", endOfGlobalWindow), + TimestampedValue.of("bar", endOfGlobalWindow)) + .advanceNextBatchWatermarkToInfinity(); + + FixedWindows windows = FixedWindows.of(Duration.standardHours(6)); + PCollection<String> windowedValues = p.apply(source) + .setCoder(TimestampedValue.TimestampedValueCoder.of(StringUtf8Coder.of())) + .apply(ParDo.of(new OnlyValue<String>())) + .apply(Window.<String>into(windows)) + .apply(WithKeys.<Integer, String>of(1)) + .apply(GroupByKey.<Integer, String>create()) + .apply(Values.<Iterable<String>>create()) + .apply(Flatten.<String>iterables()); + + PAssert.that(windowedValues) + .inWindow(windows.assignWindow(GlobalWindow.INSTANCE.maxTimestamp())) + .containsInAnyOrder("foo", "bar"); + p.run(); + } + +// @Test +// public void testMultipleStreams() throws IOException { +// SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); +// Pipeline p = Pipeline.create(options); +// options.setJobName(testName.getMethodName()); +// Duration batchDuration = Duration.millis(options.getBatchIntervalMillis()); +// +// CreateStream<String> source = +// CreateStream.<String>withBatchInterval(batchDuration) +// .nextBatch("foo", "bar").advanceWatermarkForNextBatch(new Instant(100)) +// .nextBatch().advanceNextBatchWatermarkToInfinity(); +// +//// CreateStream<Integer> other = +//// CreateStream.<Integer>withBatchInterval(batchDuration) +//// .nextBatch(1, 2, 3, 4) +//// .advanceNextBatchWatermarkToInfinity(); +// +// PCollection<String> createStrings = +// p.apply("CreateStrings", source).setCoder(StringUtf8Coder.of()) +// .apply("WindowStrings", +// Window.<String>triggering(AfterPane.elementCountAtLeast(2)) +// .withAllowedLateness(Duration.ZERO) +// .accumulatingFiredPanes()); +// PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); +//// PCollection<Integer> createInts = +//// p.apply("CreateInts", other).setCoder(VarIntCoder.of()) +//// .apply("WindowInts", +//// Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) +//// .withAllowedLateness(Duration.ZERO) +//// .accumulatingFiredPanes()); +//// PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); +// +// p.run(); +// } + + @Test + public void testElementAtPositiveInfinityThrows() { + Duration batchDuration = Duration.millis(commonOptions.getOptions().getBatchIntervalMillis()); + CreateStream<TimestampedValue<Integer>> source = + CreateStream.<TimestampedValue<Integer>>withBatchInterval(batchDuration) + .nextBatch(TimestampedValue.of(-1, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L))); + thrown.expect(IllegalArgumentException.class); + source.nextBatch(TimestampedValue.of(1, BoundedWindow.TIMESTAMP_MAX_VALUE)); + } + + @Test + public void testAdvanceWatermarkNonMonotonicThrows() { + Duration batchDuration = Duration.millis(commonOptions.getOptions().getBatchIntervalMillis()); + CreateStream<Integer> source = + CreateStream.<Integer>withBatchInterval(batchDuration) + .advanceWatermarkForNextBatch(new Instant(0L)); + thrown.expect(IllegalArgumentException.class); + source.advanceWatermarkForNextBatch(new Instant(-1L)); + } + + @Test + public void testAdvanceWatermarkEqualToPositiveInfinityThrows() { + Duration batchDuration = Duration.millis(commonOptions.getOptions().getBatchIntervalMillis()); + CreateStream<Integer> source = + CreateStream.<Integer>withBatchInterval(batchDuration) + .advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1L)); + thrown.expect(IllegalArgumentException.class); + source.advanceWatermarkForNextBatch(BoundedWindow.TIMESTAMP_MAX_VALUE); + } + + private static class OnlyValue<T> extends DoFn<TimestampedValue<T>, T> { + + OnlyValue() { } + + @ProcessElement + public void onlyValue(ProcessContext c) { + c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/96d373fe/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 62ee672..e307363 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -17,60 +17,70 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import static org.apache.beam.sdk.metrics.MetricMatchers.attemptedMetricsResult; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; -import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.beam.runners.spark.ReuseSparkContextRule; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.SparkPipelineResult; -import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule; +import org.apache.beam.runners.spark.SparkRunner; +import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton; +import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; -import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; -import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.kafka.KafkaIO; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.MetricNameFilter; -import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Keys; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PDone; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; /** - * Test pipelines which are resumed from checkpoint. + * Tests DStream recovery from checkpoint. + * + * <p>Runs the pipeline reading from a Kafka backlog with a WM function that will move to infinity + * on a EOF signal. + * After resuming from checkpoint, a single output (guaranteed by the WM) is asserted, along with + * {@link Aggregator}s values that are expected to resume from previous count as well. */ public class ResumeFromCheckpointStreamingTest { private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = @@ -78,142 +88,158 @@ public class ResumeFromCheckpointStreamingTest { private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties()); private static final String TOPIC = "kafka_beam_test_topic"; - private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of( - "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4" - ); - private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"}; - private static final long EXPECTED_AGG_FIRST = 4L; - private static final long EXPECTED_COUNTER_FIRST = 4L; - private static final long EXPECTED_AGG_SECOND = 8L; - private static final long EXPECTED_COUNTER_SECOND = 8L; @Rule - public TemporaryFolder checkpointParentDir = new TemporaryFolder(); - + public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule - public SparkTestPipelineOptionsForStreaming commonOptions = - new SparkTestPipelineOptionsForStreaming(); - + public ReuseSparkContextRule noContextResue = ReuseSparkContextRule.no(); @Rule - public ClearAggregatorsRule clearAggregatorsRule = new ClearAggregatorsRule(); + public transient TestName testName = new TestName(); @BeforeClass public static void init() throws IOException { EMBEDDED_ZOOKEEPER.startup(); EMBEDDED_KAFKA_CLUSTER.startup(); - /// this test actually requires to NOT reuse the context but rather to stop it and start again - // from the checkpoint with a brand new context. - System.setProperty("beam.spark.test.reuseSparkContext", "false"); } - private static void produce() { + private static void produce(Map<String, Instant> messages) { Properties producerProps = new Properties(); producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps()); producerProps.put("request.required.acks", 1); producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList()); Serializer<String> stringSerializer = new StringSerializer(); - try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer = - new KafkaProducer(producerProps, stringSerializer, stringSerializer)) { - for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) { + Serializer<Instant> instantSerializer = new Serializer<Instant>() { + @Override + public void configure(Map<String, ?> configs, boolean isKey) { } + + @Override + public byte[] serialize(String topic, Instant data) { + return CoderHelpers.toByteArray(data, InstantCoder.of()); + } + + @Override + public void close() { } + }; + + try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer = + new KafkaProducer(producerProps, stringSerializer, instantSerializer)) { + for (Map.Entry<String, Instant> en : messages.entrySet()) { kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue())); } kafkaProducer.close(); } } - /** - * Tests DStream recovery from checkpoint - recreate the job and continue (from checkpoint). - * <p>Also tests Aggregator values, which should be restored upon recovery from checkpoint.</p> - */ @Test - public void testRun() throws Exception { - Duration batchIntervalDuration = Duration.standardSeconds(5); - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); - // provide a generous enough batch-interval to have everything fit in one micro-batch. - options.setBatchIntervalMillis(batchIntervalDuration.getMillis()); - // provide a very generous read time bound, we rely on num records bound here. - options.setMinReadTimeMillis(batchIntervalDuration.minus(1).getMillis()); - // bound the read on the number of messages - 1 topic of 4 messages. - options.setMaxRecordsPerBatch(4L); - - // checkpoint after first (and only) interval. - options.setCheckpointDurationMillis(options.getBatchIntervalMillis()); - - MetricsFilter metricsFilter = - MetricsFilter.builder() - .addNameFilter(MetricNameFilter.inNamespace(ResumeFromCheckpointStreamingTest.class)) - .build(); + public void testWithResume() throws Exception { + SparkPipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + options.setCheckpointDir(tmpFolder.newFolder().toString()); + options.setCheckpointDurationMillis(500L); + options.setJobName(testName.getMethodName()); + options.setSparkMaster("local[*]"); + + // write to Kafka + produce(ImmutableMap.of( + "k1", new Instant(100), + "k2", new Instant(200), + "k3", new Instant(300), + "k4", new Instant(400) + )); // first run will read from Kafka backlog - "auto.offset.reset=smallest" SparkPipelineResult res = run(options); + res.waitUntilFinish(Duration.standardSeconds(2)); + // assertions 1: long processedMessages1 = res.getAggregatorValue("processedMessages", Long.class); - assertThat(String.format("Expected %d processed messages count but " - + "found %d", EXPECTED_AGG_FIRST, processedMessages1), processedMessages1, - equalTo(EXPECTED_AGG_FIRST)); - assertThat(res.metrics().queryMetrics(metricsFilter).counters(), - hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), - "aCounter", "formatKV", EXPECTED_COUNTER_FIRST))); + assertThat( + String.format( + "Expected %d processed messages count but found %d", 4, processedMessages1), + processedMessages1, + equalTo(4L)); + + //--- between executions: + + //- clear state. + AccumulatorSingleton.clear(); + GlobalWatermarkHolder.clear(); + + //- write a bit more. + produce(ImmutableMap.of( + "k5", new Instant(499), + "EOF", new Instant(500) // to be dropped from [0, 500). + )); // recovery should resume from last read offset, and read the second batch of input. res = runAgain(options); + res.waitUntilFinish(Duration.standardSeconds(2)); + // assertions 2: long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class); - assertThat(String.format("Expected %d processed messages count but " - + "found %d", EXPECTED_AGG_SECOND, processedMessages2), processedMessages2, - equalTo(EXPECTED_AGG_SECOND)); - assertThat(res.metrics().queryMetrics(metricsFilter).counters(), - hasItem(attemptedMetricsResult(ResumeFromCheckpointStreamingTest.class.getName(), - "aCounter", "formatKV", EXPECTED_COUNTER_SECOND))); + int successAssertions = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); + assertThat( + String.format("Expected %d processed messages count but found %d", 1, processedMessages2), + processedMessages2, + equalTo(5L)); + res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); + assertThat( + String.format( + "Expected %d successful assertions, but found %d.", 1, successAssertions), + successAssertions, + is(1)); + // validate assertion didn't fail. + int failedAssertions = res.getAggregatorValue(PAssert.FAILURE_COUNTER, Integer.class); + assertThat( + String.format("Found %d failed assertions.", failedAssertions), + failedAssertions, + is(0)); + } private SparkPipelineResult runAgain(SparkPipelineOptions options) { - clearAggregatorsRule.clearNamedAggregators(); // sleep before next run. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); return run(options); } private static SparkPipelineResult run(SparkPipelineOptions options) { - // write to Kafka - produce(); - Map<String, Object> consumerProps = ImmutableMap.<String, Object>of( - "auto.offset.reset", "earliest" - ); - - KafkaIO.Read<String, String> read = KafkaIO.<String, String>read() + KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read() .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList()) .withTopics(Collections.singletonList(TOPIC)) .withKeyCoder(StringUtf8Coder.of()) - .withValueCoder(StringUtf8Coder.of()) - .updateConsumerProperties(consumerProps); - - Duration windowDuration = new Duration(options.getBatchIntervalMillis()); + .withValueCoder(InstantCoder.of()) + .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest")) + .withTimestampFn(new SerializableFunction<KV<String, Instant>, Instant>() { + @Override + public Instant apply(KV<String, Instant> kv) { + return kv.getValue(); + } + }).withWatermarkFn(new SerializableFunction<KV<String, Instant>, Instant>() { + @Override + public Instant apply(KV<String, Instant> kv) { + // at EOF move WM to infinity. + String key = kv.getKey(); + Instant instant = kv.getValue(); + return key.equals("EOF") ? BoundedWindow.TIMESTAMP_MAX_VALUE : instant; + } + }); Pipeline p = Pipeline.create(options); - PCollection<String> expectedCol = - p.apply(Create.of(ImmutableList.copyOf(EXPECTED)).withCoder(StringUtf8Coder.of())); - final PCollectionView<List<String>> expectedView = expectedCol.apply(View.<String>asList()); - - PCollection<String> formattedKV = - p.apply(read.withoutMetadata()) - .apply("formatKV", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() { - Counter counter = - Metrics.counter(ResumeFromCheckpointStreamingTest.class, "aCounter"); - - @ProcessElement - public void process(ProcessContext c) { - // Check side input is passed correctly also after resuming from checkpoint - Assert.assertEquals(c.sideInput(expectedView), Arrays.asList(EXPECTED)); - counter.inc(); - c.output(c.element()); - } - }).withSideInputs(expectedView)) - .apply(Window.<KV<String, String>>into(FixedWindows.of(windowDuration))) - .apply(ParDo.of(new FormatAsText())); - - // graceful shutdown will make sure first batch (at least) will finish. - Duration timeout = Duration.standardSeconds(1L); - return PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED, timeout); + PCollection<Iterable<String>> grouped = p + .apply(read.withoutMetadata()) + .apply(Keys.<String>create()) + .apply(ParDo.of(new EOFShallNotPassFn())) + .apply(Window.<String>into(FixedWindows.of(Duration.millis(500))) + .triggering(AfterWatermark.pastEndOfWindow()) + .accumulatingFiredPanes() + .withAllowedLateness(Duration.ZERO)) + .apply(WithKeys.<Integer, String>of(1)) + .apply(GroupByKey.<Integer, String>create()) + .apply(Values.<Iterable<String>>create()); + + grouped.apply(new PAssertWithoutFlatten<>("k1", "k2", "k3", "k4", "k5")); + + return (SparkPipelineResult) p.run(); } @AfterClass @@ -222,16 +248,60 @@ public class ResumeFromCheckpointStreamingTest { EMBEDDED_ZOOKEEPER.shutdown(); } - private static class FormatAsText extends DoFn<KV<String, String>, String> { - + /** A pass-through fn that prevents EOF event from passing. */ + private static class EOFShallNotPassFn extends DoFn<String, String> { private final Aggregator<Long, Long> aggregator = createAggregator("processedMessages", Sum.ofLongs()); @ProcessElement public void process(ProcessContext c) { - aggregator.addValue(1L); - String formatted = c.element().getKey() + "," + c.element().getValue(); - c.output(formatted); + String element = c.element(); + if (!element.equals("EOF")) { + aggregator.addValue(1L); + c.output(c.element()); + } + } + } + + /** + * A custom PAssert that avoids using {@link org.apache.beam.sdk.transforms.Flatten} + * until BEAM-1444 is resolved. + */ + private static class PAssertWithoutFlatten<T> + extends PTransform<PCollection<Iterable<T>>, PDone> { + private final T[] expected; + + private PAssertWithoutFlatten(T... expected) { + this.expected = expected; + } + + @Override + public PDone expand(PCollection<Iterable<T>> input) { + input.apply(ParDo.of(new AssertDoFn<>(expected))); + return PDone.in(input.getPipeline()); + } + + private static class AssertDoFn<T> extends DoFn<Iterable<T>, Void> { + private final Aggregator<Integer, Integer> success = + createAggregator(PAssert.SUCCESS_COUNTER, Sum.ofIntegers()); + private final Aggregator<Integer, Integer> failure = + createAggregator(PAssert.FAILURE_COUNTER, Sum.ofIntegers()); + private final T[] expected; + + AssertDoFn(T[] expected) { + this.expected = expected; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + try { + assertThat(c.element(), containsInAnyOrder(expected)); + success.addValue(1); + } catch (Throwable t) { + failure.addValue(1); + throw t; + } + } } }