[ https://issues.apache.org/jira/browse/BEAM-622?focusedWorklogId=84711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84711 ]
ASF GitHub Bot logged work on BEAM-622: --------------------------------------- Author: ASF GitHub Bot Created on: 27/Mar/18 04:38 Start Date: 27/Mar/18 04:38 Worklog Time Spent: 10m Work Description: aljoscha closed pull request #4937: [BEAM-622] Add checkpointing tests for DoFnOperator and WindowDoFnOpe… URL: https://github.com/apache/beam/pull/4937 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java index 0b376e9ddd1..73be0ef09ce 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java @@ -17,13 +17,11 @@ */ package org.apache.beam.runners.flink.streaming; +import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertThat; -import com.google.common.base.Function; -import com.google.common.collect.FluentIterable; import java.nio.ByteBuffer; -import javax.annotation.Nullable; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.ValueWithRecordId; @@ -64,7 +62,7 @@ public void testDeduping() throws Exception { WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, key1.getBytes())))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()), + stripStreamRecordFromWindowedValue(harness.getOutput()), contains(WindowedValue.valueInGlobalWindow(key1), WindowedValue.valueInGlobalWindow(key2))); @@ -86,7 +84,7 @@ public void testDeduping() throws Exception { WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, key3.getBytes())))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()), + stripStreamRecordFromWindowedValue(harness.getOutput()), contains(WindowedValue.valueInGlobalWindow(key3))); harness.close(); @@ -102,26 +100,4 @@ public void testDeduping() throws Exception { value -> ByteBuffer.wrap(value.getValue().getId()), TypeInformation.of(ByteBuffer.class)); } - - private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue( - Iterable<Object> input) { - - return FluentIterable.from(input) - .filter( - o -> - o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) - .transform( - new Function<Object, WindowedValue<T>>() { - @Nullable - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public WindowedValue<T> apply(@Nullable Object o) { - if (o instanceof StreamRecord - && ((StreamRecord) o).getValue() instanceof WindowedValue) { - return (WindowedValue) ((StreamRecord) o).getValue(); - } - throw new RuntimeException("unreachable"); - } - }); - } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 73a0a08f29c..4d6fca62fb5 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.streaming; +import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.collection.IsIterableContainingInOrder.contains; import static org.junit.Assert.assertEquals; @@ -28,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.HashMap; +import java.util.Optional; import javax.annotation.Nullable; import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.flink.FlinkPipelineOptions; @@ -39,6 +41,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -65,6 +68,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; @@ -133,7 +137,7 @@ public void testSingleOutput() throws Exception { testHarness.processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello"))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains(WindowedValue.valueInGlobalWindow("Hello"))); testHarness.close(); @@ -259,7 +263,6 @@ public void onEventTime(OnTimerContext context) { StringUtf8Coder.of(), windowingStrategy.getWindowFn().windowCoder()); - TupleTag<String> outputTag = new TupleTag<>("main-output"); DoFnOperator<Integer, String> doFnOperator = @@ -293,14 +296,14 @@ public void onEventTime(OnTimerContext context) { new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); // this does not yet fire the timer (in vanilla Flink it would) testHarness.processWatermark(timerTimestamp.getMillis()); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); testHarness.getOutput().clear(); @@ -309,7 +312,7 @@ public void onEventTime(OnTimerContext context) { testHarness.processWatermark(timerTimestamp.getMillis() + 1); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING))); @@ -372,7 +375,7 @@ public void processElement(ProcessContext context) { new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains(WindowedValue.of("13", new Instant(0), window1, PaneInfo.NO_FIRING))); testHarness.getOutput().clear(); @@ -384,7 +387,7 @@ public void processElement(ProcessContext context) { new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains(WindowedValue.of("17", new Instant(0), window1, PaneInfo.NO_FIRING))); testHarness.getOutput().clear(); @@ -396,7 +399,7 @@ public void processElement(ProcessContext context) { new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, PaneInfo.NO_FIRING))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); testHarness.close(); @@ -488,7 +491,7 @@ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, PaneInfo.NO_FIRING))); assertThat( - this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( KV.of("key1", 5 + offset), new Instant(1), window1, PaneInfo.NO_FIRING), @@ -510,7 +513,7 @@ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState<String> .getMillis() + 1); assertThat( - this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( KV.of("key1", timerOutput), new Instant(9), window1, PaneInfo.NO_FIRING), @@ -603,13 +606,87 @@ public void testSideInputs(boolean keyed) throws Exception { valuesInWindow(ImmutableList.of("foo", "bar"), new Instant(1000), secondWindow)))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains(helloElement, worldElement)); testHarness.close(); } + @Test + public void testStateRestore() throws Exception { + DoFn<KV<String, Long>, KV<String, Long>> filterElementsEqualToCountFn = + new DoFn<KV<String, Long>, KV<String, Long>>() { + + @StateId("counter") + private final StateSpec<ValueState<Long>> counterSpec = StateSpecs + .value(VarLongCoder.of()); + + @ProcessElement + public void processElement(ProcessContext context, + @StateId("counter") ValueState<Long> count) { + long currentCount = Optional.ofNullable(count.read()).orElse(0L); + currentCount = currentCount + 1; + count.write(currentCount); + + KV<String, Long> currentElement = context.element(); + if (currentCount == currentElement.getValue()) { + context.output(currentElement); + } + } + }; + + WindowingStrategy<Object, GlobalWindow> windowingStrategy = WindowingStrategy.globalDefault(); + + TupleTag<KV<String, Long>> outputTag = new TupleTag<>("main-output"); + + FullWindowedValueCoder<KV<String, Long>> kvCoder = WindowedValue.getFullCoder( + KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()), + windowingStrategy.getWindowFn().windowCoder() + ); + + CoderTypeInformation<String> keyCoderInfo = new CoderTypeInformation<>(StringUtf8Coder.of()); + KeySelector<WindowedValue<KV<String, Long>>, String> keySelector = e -> e.getValue().getKey(); + + OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Long>>, + WindowedValue<KV<String, Long>>> testHarness = createTestHarness(windowingStrategy, + filterElementsEqualToCountFn, kvCoder, kvCoder, outputTag, keyCoderInfo, keySelector); + testHarness.open(); + + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L)))); + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L)))); + + final OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + testHarness.close(); + + testHarness = createTestHarness(windowingStrategy, filterElementsEqualToCountFn, kvCoder, + kvCoder, outputTag, keyCoderInfo, keySelector); + testHarness.initializeState(snapshot); + testHarness.open(); + + // after restore: counter = 2 + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L)))); + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 4L)))); + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 5L)))); + testHarness + .processElement(new StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L)))); + + assertThat( + stripStreamRecordFromWindowedValue(testHarness.getOutput()), + contains( + WindowedValue.valueInGlobalWindow(KV.of("a", 4L)), + WindowedValue.valueInGlobalWindow(KV.of("a", 5L)) + ) + ); + + testHarness.close(); + } + @Test public void testTimersRestore() throws Exception { final Instant timerTimestamp = new Instant(1000); @@ -649,11 +726,15 @@ public void onEventTime(OnTimerContext context) { TupleTag<String> outputTag = new TupleTag<>("main-output"); + final CoderTypeSerializer<WindowedValue<String>> outputSerializer = new CoderTypeSerializer<>( outputCoder); + CoderTypeInformation<Integer> keyCoderInfo = new CoderTypeInformation<>(VarIntCoder.of()); + KeySelector<WindowedValue<Integer>, Integer> keySelector = WindowedValue::getValue; OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness = - createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, outputTag); + createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, outputTag, keyCoderInfo, + keySelector); testHarness.setup(outputSerializer); @@ -668,14 +749,15 @@ public void onEventTime(OnTimerContext context) { new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING))); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), emptyIterable()); // snapshot and restore final OperatorStateHandles snapshot = testHarness.snapshot(0, 0); testHarness.close(); - testHarness = createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, outputTag); + testHarness = createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, outputTag, + keyCoderInfo, keySelector); testHarness.setup(outputSerializer); testHarness.initializeState(snapshot); testHarness.open(); @@ -684,7 +766,7 @@ public void onEventTime(OnTimerContext context) { testHarness.processWatermark(timerTimestamp.getMillis() + 1); assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.of( outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING))); @@ -692,26 +774,25 @@ public void onEventTime(OnTimerContext context) { testHarness.close(); } - private OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> - createTestHarness(WindowingStrategy<Object, IntervalWindow> windowingStrategy, - DoFn<Integer, String> fn, FullWindowedValueCoder<Integer> inputCoder, - FullWindowedValueCoder<String> outputCoder, TupleTag<String> outputTag) throws Exception { - DoFnOperator<Integer, String> doFnOperator = - new DoFnOperator<>( - fn, - "stepName", - inputCoder, - outputTag, - Collections.emptyList(), - new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder), - windowingStrategy, - new HashMap<>(), /* side-input mapping */ - Collections.emptyList(), /* side inputs */ - PipelineOptionsFactory.as(FlinkPipelineOptions.class), - VarIntCoder.of() /* key coder */); - - return new KeyedOneInputStreamOperatorTestHarness<>( - doFnOperator, WindowedValue::getValue, new CoderTypeInformation<>(VarIntCoder.of())); + private <K, InT, OutT> OneInputStreamOperatorTestHarness<WindowedValue<InT>, WindowedValue<OutT>> + createTestHarness(WindowingStrategy<Object, ?> windowingStrategy, DoFn<InT, OutT> fn, + FullWindowedValueCoder<InT> inputCoder, FullWindowedValueCoder<OutT> outputCoder, + TupleTag<OutT> outputTag, TypeInformation<K> keyCoderInfo, + KeySelector<WindowedValue<InT>, K> keySelector) throws Exception { + DoFnOperator<InT, OutT> doFnOperator = new DoFnOperator<>( + fn, + "stepName", + inputCoder, + outputTag, + Collections.emptyList(), + new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, outputCoder), + windowingStrategy, + new HashMap<>(), /* side-input mapping */ + Collections.emptyList(), /* side inputs */ + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + VarIntCoder.of() /* key coder */); + + return new KeyedOneInputStreamOperatorTestHarness<>(doFnOperator, keySelector, keyCoderInfo); } /** @@ -783,7 +864,7 @@ public void finishBundle(FinishBundleContext context) { // There is a finishBundle in snapshot() // Elements will be buffered as part of finishing a bundle in snapshot() assertThat( - this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()), + stripStreamRecordFromWindowedValue(testHarness.getOutput()), contains( WindowedValue.valueInGlobalWindow("a"), WindowedValue.valueInGlobalWindow("b"), @@ -821,7 +902,7 @@ public void finishBundle(FinishBundleContext context) { newHarness.setProcessingTime(10); assertThat( - this.<String>stripStreamRecordFromWindowedValue(newHarness.getOutput()), + stripStreamRecordFromWindowedValue(newHarness.getOutput()), contains( WindowedValue.valueInGlobalWindow("finishBundle"), WindowedValue.valueInGlobalWindow("d"), @@ -830,28 +911,6 @@ public void finishBundle(FinishBundleContext context) { newHarness.close(); } - private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue( - Iterable<Object> input) { - - return FluentIterable.from(input) - .filter( - o -> - o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) - .transform( - new Function<Object, WindowedValue<T>>() { - @Nullable - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public WindowedValue<T> apply(@Nullable Object o) { - if (o instanceof StreamRecord - && ((StreamRecord) o).getValue() instanceof WindowedValue) { - return (WindowedValue) ((StreamRecord) o).getValue(); - } - throw new RuntimeException("unreachable"); - } - }); - } - private Iterable<WindowedValue<String>> stripStreamRecord(Iterable<?> input) { return FluentIterable.from(input) .filter(o -> o instanceof StreamRecord) diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java new file mode 100644 index 00000000000..aebb41410fb --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import com.google.common.base.Function; +import com.google.common.collect.FluentIterable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +class StreamRecordStripper { + + @SuppressWarnings("Guava") + static <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(Iterable<Object> input) { + return FluentIterable.from(input) + .filter( + o -> + o instanceof StreamRecord && ((StreamRecord) o).getValue() instanceof WindowedValue) + .transform( + new Function<Object, WindowedValue<T>>() { + @Nullable + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public WindowedValue<T> apply(@Nullable Object o) { + if (o instanceof StreamRecord + && ((StreamRecord) o).getValue() instanceof WindowedValue) { + return (WindowedValue) ((StreamRecord) o).getValue(); + } + throw new RuntimeException("unreachable"); + } + }); + } + +} diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java new file mode 100644 index 00000000000..5a3717dc145 --- /dev/null +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.streaming; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.joda.time.Duration.standardMinutes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import org.apache.beam.runners.core.KeyedWorkItem; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; +import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; +import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link WindowDoFnOperator}. + */ +@RunWith(JUnit4.class) +public class WindowDoFnOperatorTest { + + @Test + public void testRestore() throws Exception { + // test harness + KeyedOneInputStreamOperatorTestHarness<ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, + WindowedValue<KV<Long, Long>>> testHarness = createTestHarness(getWindowDoFnOperator()); + testHarness.open(); + + // process elements + IntervalWindow window = new IntervalWindow(new Instant(0), Duration.millis(10_000)); + testHarness.processWatermark(0L); + testHarness.processElement( + Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord()); + testHarness.processElement( + Item.builder().key(1L).timestamp(2L).value(20L).window(window).build().toStreamRecord()); + testHarness.processElement( + Item.builder().key(2L).timestamp(3L).value(77L).window(window).build().toStreamRecord()); + + // create snapshot + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); + testHarness.close(); + + // restore from the snapshot + testHarness = createTestHarness(getWindowDoFnOperator()); + testHarness.initializeState(snapshot); + testHarness.open(); + + // close window + testHarness.processWatermark(10_000L); + + Iterable<WindowedValue<KV<Long, Long>>> output = stripStreamRecordFromWindowedValue( + testHarness.getOutput()); + + assertEquals(2, Iterables.size(output)); + assertThat(output, containsInAnyOrder( + WindowedValue.of(KV.of(1L, 120L), new Instant(9_999), window, + PaneInfo.createPane(true, true, ON_TIME)), + WindowedValue.of(KV.of(2L, 77L), new Instant(9_999), window, + PaneInfo.createPane(true, true, ON_TIME)) + ) + ); + // cleanup + testHarness.close(); + } + + private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() { + WindowingStrategy<Object, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(standardMinutes(1))); + + TupleTag<KV<Long, Long>> outputTag = new TupleTag<>("main-output"); + + SystemReduceFn<Long, Long, long[], Long, BoundedWindow> reduceFn = SystemReduceFn.combining( + VarLongCoder.of(), + AppliedCombineFn.withInputCoder( + Sum.ofLongs(), + CoderRegistry.createDefault(), + KvCoder.of(VarLongCoder.of(), VarLongCoder.of()) + ) + ); + + Coder<IntervalWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder(); + SingletonKeyedWorkItemCoder<Long, Long> workItemCoder = SingletonKeyedWorkItemCoder + .of(VarLongCoder.of(), VarLongCoder.of(), windowCoder); + FullWindowedValueCoder<SingletonKeyedWorkItem<Long, Long>> inputCoder = WindowedValue + .getFullCoder(workItemCoder, windowCoder); + FullWindowedValueCoder<KV<Long, Long>> outputCoder = WindowedValue + .getFullCoder(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()), windowCoder); + + return new WindowDoFnOperator<Long, Long, Long>( + reduceFn, + "stepName", + (Coder) inputCoder, + outputTag, + emptyList(), + new MultiOutputOutputManagerFactory<>(outputTag, outputCoder), + windowingStrategy, + emptyMap(), + emptyList(), + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + VarLongCoder.of() + ); + } + + private KeyedOneInputStreamOperatorTestHarness<ByteBuffer, + WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> createTestHarness( + WindowDoFnOperator<Long, Long, Long> windowDoFnOperator) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + windowDoFnOperator, + (KeySelector<WindowedValue<KeyedWorkItem<Long, Long>>, ByteBuffer>) o -> { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + VarLongCoder.of().encode(o.getValue().key(), baos); + return ByteBuffer.wrap(baos.toByteArray()); + } + }, + new GenericTypeInfo<>(ByteBuffer.class) + ); + } + + private static class Item { + + static ItemBuilder builder() { + return new ItemBuilder(); + } + + private long key; + private long value; + private long timestamp; + private IntervalWindow window; + + StreamRecord<WindowedValue<KeyedWorkItem<Long, Long>>> toStreamRecord() { + WindowedValue<Long> item = WindowedValue.of(value, new Instant(timestamp), window, NO_FIRING); + WindowedValue<KeyedWorkItem<Long, Long>> keyedItem = WindowedValue + .of(new SingletonKeyedWorkItem<>(key, item), new Instant(timestamp), window, NO_FIRING); + return new StreamRecord<>(keyedItem); + } + + private static final class ItemBuilder { + + private long key; + private long value; + private long timestamp; + private IntervalWindow window; + + ItemBuilder key(long key) { + this.key = key; + return this; + } + + ItemBuilder value(long value) { + this.value = value; + return this; + } + + ItemBuilder timestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + ItemBuilder window(IntervalWindow window) { + this.window = window; + return this; + } + + Item build() { + Item item = new Item(); + item.key = this.key; + item.value = this.value; + item.window = this.window; + item.timestamp = this.timestamp; + return item; + } + } + + } + +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 84711) Time Spent: 1h 20m (was: 1h 10m) > Add checkpointing tests for DoFnOperator and WindowDoFnOperator > ---------------------------------------------------------------- > > Key: BEAM-622 > URL: https://issues.apache.org/jira/browse/BEAM-622 > Project: Beam > Issue Type: Test > Components: runner-flink > Affects Versions: 0.3.0-incubating > Reporter: Maximilian Michels > Assignee: Grzegorz Kołakowski > Priority: Major > Fix For: 2.5.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > Tests which test the correct snapshotting of these two operators are missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)