[ 
https://issues.apache.org/jira/browse/BEAM-622?focusedWorklogId=84711&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84711
 ]

ASF GitHub Bot logged work on BEAM-622:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Mar/18 04:38
            Start Date: 27/Mar/18 04:38
    Worklog Time Spent: 10m 
      Work Description: aljoscha closed pull request #4937: [BEAM-622] Add 
checkpointing tests for DoFnOperator and WindowDoFnOpe…
URL: https://github.com/apache/beam/pull/4937
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
index 0b376e9ddd1..73be0ef09ce 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
@@ -17,13 +17,11 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
 import java.nio.ByteBuffer;
-import javax.annotation.Nullable;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.ValueWithRecordId;
@@ -64,7 +62,7 @@ public void testDeduping() throws Exception {
         WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key1, 
key1.getBytes()))));
 
     assertThat(
-        this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+        stripStreamRecordFromWindowedValue(harness.getOutput()),
         contains(WindowedValue.valueInGlobalWindow(key1),
             WindowedValue.valueInGlobalWindow(key2)));
 
@@ -86,7 +84,7 @@ public void testDeduping() throws Exception {
         WindowedValue.valueInGlobalWindow(new ValueWithRecordId<>(key3, 
key3.getBytes()))));
 
     assertThat(
-        this.<String>stripStreamRecordFromWindowedValue(harness.getOutput()),
+        stripStreamRecordFromWindowedValue(harness.getOutput()),
         contains(WindowedValue.valueInGlobalWindow(key3)));
 
     harness.close();
@@ -102,26 +100,4 @@ public void testDeduping() throws Exception {
         value -> ByteBuffer.wrap(value.getValue().getId()),
         TypeInformation.of(ByteBuffer.class));
   }
-
-  private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
-      Iterable<Object> input) {
-
-    return FluentIterable.from(input)
-        .filter(
-            o ->
-                o instanceof StreamRecord && ((StreamRecord) o).getValue() 
instanceof WindowedValue)
-        .transform(
-            new Function<Object, WindowedValue<T>>() {
-              @Nullable
-              @Override
-              @SuppressWarnings({"unchecked", "rawtypes"})
-              public WindowedValue<T> apply(@Nullable Object o) {
-                if (o instanceof StreamRecord
-                    && ((StreamRecord) o).getValue() instanceof WindowedValue) 
{
-                  return (WindowedValue) ((StreamRecord) o).getValue();
-                }
-                throw new RuntimeException("unreachable");
-              }
-            });
-  }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 73a0a08f29c..4d6fca62fb5 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.streaming;
 
+import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
@@ -28,6 +29,7 @@
 import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Optional;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
@@ -39,6 +41,7 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -65,6 +68,7 @@
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
@@ -133,7 +137,7 @@ public void testSingleOutput() throws Exception {
     testHarness.processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow("Hello")));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(WindowedValue.valueInGlobalWindow("Hello")));
 
     testHarness.close();
@@ -259,7 +263,6 @@ public void onEventTime(OnTimerContext context) {
             StringUtf8Coder.of(),
             windowingStrategy.getWindowFn().windowCoder());
 
-
     TupleTag<String> outputTag = new TupleTag<>("main-output");
 
     DoFnOperator<Integer, String> doFnOperator =
@@ -293,14 +296,14 @@ public void onEventTime(OnTimerContext context) {
         new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         emptyIterable());
 
     // this does not yet fire the timer (in vanilla Flink it would)
     testHarness.processWatermark(timerTimestamp.getMillis());
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         emptyIterable());
 
     testHarness.getOutput().clear();
@@ -309,7 +312,7 @@ public void onEventTime(OnTimerContext context) {
     testHarness.processWatermark(timerTimestamp.getMillis() + 1);
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.of(
                 outputMessage, new Instant(timerTimestamp), window1, 
PaneInfo.NO_FIRING)));
@@ -372,7 +375,7 @@ public void processElement(ProcessContext context) {
         new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(WindowedValue.of("13", new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     testHarness.getOutput().clear();
@@ -384,7 +387,7 @@ public void processElement(ProcessContext context) {
         new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(WindowedValue.of("17", new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     testHarness.getOutput().clear();
@@ -396,7 +399,7 @@ public void processElement(ProcessContext context) {
         new StreamRecord<>(WindowedValue.of(17, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         emptyIterable());
 
     testHarness.close();
@@ -488,7 +491,7 @@ public void onTimer(OnTimerContext context, 
@StateId(stateId) ValueState<String>
             WindowedValue.of(KV.of("key2", 7), new Instant(3), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        this.<KV<String, 
Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.of(
                 KV.of("key1", 5 + offset), new Instant(1), window1, 
PaneInfo.NO_FIRING),
@@ -510,7 +513,7 @@ public void onTimer(OnTimerContext context, 
@StateId(stateId) ValueState<String>
             .getMillis() + 1);
 
     assertThat(
-        this.<KV<String, 
Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.of(
                 KV.of("key1", timerOutput), new Instant(9), window1, 
PaneInfo.NO_FIRING),
@@ -603,13 +606,87 @@ public void testSideInputs(boolean keyed) throws 
Exception {
                 valuesInWindow(ImmutableList.of("foo", "bar"), new 
Instant(1000), secondWindow))));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(helloElement, worldElement));
 
     testHarness.close();
 
   }
 
+  @Test
+  public void testStateRestore() throws Exception {
+    DoFn<KV<String, Long>, KV<String, Long>> filterElementsEqualToCountFn =
+        new DoFn<KV<String, Long>, KV<String, Long>>() {
+
+          @StateId("counter")
+          private final StateSpec<ValueState<Long>> counterSpec = StateSpecs
+              .value(VarLongCoder.of());
+
+          @ProcessElement
+          public void processElement(ProcessContext context,
+              @StateId("counter") ValueState<Long> count) {
+            long currentCount = Optional.ofNullable(count.read()).orElse(0L);
+            currentCount = currentCount + 1;
+            count.write(currentCount);
+
+            KV<String, Long> currentElement = context.element();
+            if (currentCount == currentElement.getValue()) {
+              context.output(currentElement);
+            }
+          }
+        };
+
+    WindowingStrategy<Object, GlobalWindow> windowingStrategy = 
WindowingStrategy.globalDefault();
+
+    TupleTag<KV<String, Long>> outputTag = new TupleTag<>("main-output");
+
+    FullWindowedValueCoder<KV<String, Long>> kvCoder = 
WindowedValue.getFullCoder(
+        KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()),
+        windowingStrategy.getWindowFn().windowCoder()
+    );
+
+    CoderTypeInformation<String> keyCoderInfo = new 
CoderTypeInformation<>(StringUtf8Coder.of());
+    KeySelector<WindowedValue<KV<String, Long>>, String> keySelector = e -> 
e.getValue().getKey();
+
+    OneInputStreamOperatorTestHarness<WindowedValue<KV<String, Long>>,
+        WindowedValue<KV<String, Long>>> testHarness = 
createTestHarness(windowingStrategy,
+        filterElementsEqualToCountFn, kvCoder, kvCoder, outputTag, 
keyCoderInfo, keySelector);
+    testHarness.open();
+
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L))));
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L))));
+
+    final OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    testHarness.close();
+
+    testHarness = createTestHarness(windowingStrategy, 
filterElementsEqualToCountFn, kvCoder,
+        kvCoder, outputTag, keyCoderInfo, keySelector);
+    testHarness.initializeState(snapshot);
+    testHarness.open();
+
+    // after restore: counter = 2
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L))));
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 4L))));
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 5L))));
+    testHarness
+        .processElement(new 
StreamRecord<>(WindowedValue.valueInGlobalWindow(KV.of("a", 100L))));
+
+    assertThat(
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.valueInGlobalWindow(KV.of("a", 4L)),
+            WindowedValue.valueInGlobalWindow(KV.of("a", 5L))
+        )
+    );
+
+    testHarness.close();
+  }
+
   @Test
   public void testTimersRestore() throws Exception {
     final Instant timerTimestamp = new Instant(1000);
@@ -649,11 +726,15 @@ public void onEventTime(OnTimerContext context) {
 
 
     TupleTag<String> outputTag = new TupleTag<>("main-output");
+
     final CoderTypeSerializer<WindowedValue<String>> outputSerializer = new 
CoderTypeSerializer<>(
         outputCoder);
+    CoderTypeInformation<Integer> keyCoderInfo = new 
CoderTypeInformation<>(VarIntCoder.of());
+    KeySelector<WindowedValue<Integer>, Integer> keySelector = 
WindowedValue::getValue;
 
     OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<String>> testHarness =
-        createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, 
outputTag);
+        createTestHarness(windowingStrategy, fn, inputCoder, outputCoder, 
outputTag, keyCoderInfo,
+            keySelector);
 
     testHarness.setup(outputSerializer);
 
@@ -668,14 +749,15 @@ public void onEventTime(OnTimerContext context) {
         new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         emptyIterable());
 
     // snapshot and restore
     final OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
     testHarness.close();
 
-    testHarness = createTestHarness(windowingStrategy, fn, inputCoder, 
outputCoder, outputTag);
+    testHarness = createTestHarness(windowingStrategy, fn, inputCoder, 
outputCoder, outputTag,
+        keyCoderInfo, keySelector);
     testHarness.setup(outputSerializer);
     testHarness.initializeState(snapshot);
     testHarness.open();
@@ -684,7 +766,7 @@ public void onEventTime(OnTimerContext context) {
     testHarness.processWatermark(timerTimestamp.getMillis() + 1);
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.of(
                 outputMessage, new Instant(timerTimestamp), window1, 
PaneInfo.NO_FIRING)));
@@ -692,26 +774,25 @@ public void onEventTime(OnTimerContext context) {
     testHarness.close();
   }
 
-  private OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<String>>
-  createTestHarness(WindowingStrategy<Object, IntervalWindow> 
windowingStrategy,
-      DoFn<Integer, String> fn, FullWindowedValueCoder<Integer> inputCoder,
-      FullWindowedValueCoder<String> outputCoder, TupleTag<String> outputTag) 
throws Exception {
-    DoFnOperator<Integer, String> doFnOperator =
-        new DoFnOperator<>(
-            fn,
-            "stepName",
-            inputCoder,
-            outputTag,
-            Collections.emptyList(),
-            new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
outputCoder),
-            windowingStrategy,
-            new HashMap<>(), /* side-input mapping */
-            Collections.emptyList(), /* side inputs */
-            PipelineOptionsFactory.as(FlinkPipelineOptions.class),
-            VarIntCoder.of() /* key coder */);
-
-    return new KeyedOneInputStreamOperatorTestHarness<>(
-        doFnOperator, WindowedValue::getValue, new 
CoderTypeInformation<>(VarIntCoder.of()));
+  private <K, InT, OutT> OneInputStreamOperatorTestHarness<WindowedValue<InT>, 
WindowedValue<OutT>>
+  createTestHarness(WindowingStrategy<Object, ?> windowingStrategy, DoFn<InT, 
OutT> fn,
+      FullWindowedValueCoder<InT> inputCoder, FullWindowedValueCoder<OutT> 
outputCoder,
+      TupleTag<OutT> outputTag, TypeInformation<K> keyCoderInfo,
+      KeySelector<WindowedValue<InT>, K> keySelector) throws Exception {
+    DoFnOperator<InT, OutT> doFnOperator = new DoFnOperator<>(
+        fn,
+        "stepName",
+        inputCoder,
+        outputTag,
+        Collections.emptyList(),
+        new DoFnOperator.MultiOutputOutputManagerFactory<>(outputTag, 
outputCoder),
+        windowingStrategy,
+        new HashMap<>(), /* side-input mapping */
+        Collections.emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarIntCoder.of() /* key coder */);
+
+    return new KeyedOneInputStreamOperatorTestHarness<>(doFnOperator, 
keySelector, keyCoderInfo);
   }
 
   /**
@@ -783,7 +864,7 @@ public void finishBundle(FinishBundleContext context) {
     // There is a finishBundle in snapshot()
     // Elements will be buffered as part of finishing a bundle in snapshot()
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.valueInGlobalWindow("a"),
             WindowedValue.valueInGlobalWindow("b"),
@@ -821,7 +902,7 @@ public void finishBundle(FinishBundleContext context) {
     newHarness.setProcessingTime(10);
 
     assertThat(
-        
this.<String>stripStreamRecordFromWindowedValue(newHarness.getOutput()),
+        stripStreamRecordFromWindowedValue(newHarness.getOutput()),
         contains(
             WindowedValue.valueInGlobalWindow("finishBundle"),
             WindowedValue.valueInGlobalWindow("d"),
@@ -830,28 +911,6 @@ public void finishBundle(FinishBundleContext context) {
     newHarness.close();
   }
 
-  private <T> Iterable<WindowedValue<T>> stripStreamRecordFromWindowedValue(
-      Iterable<Object> input) {
-
-    return FluentIterable.from(input)
-        .filter(
-            o ->
-                o instanceof StreamRecord && ((StreamRecord) o).getValue() 
instanceof WindowedValue)
-        .transform(
-            new Function<Object, WindowedValue<T>>() {
-              @Nullable
-              @Override
-              @SuppressWarnings({"unchecked", "rawtypes"})
-              public WindowedValue<T> apply(@Nullable Object o) {
-                if (o instanceof StreamRecord
-                    && ((StreamRecord) o).getValue() instanceof WindowedValue) 
{
-                  return (WindowedValue) ((StreamRecord) o).getValue();
-                }
-                throw new RuntimeException("unreachable");
-              }
-            });
-  }
-
   private Iterable<WindowedValue<String>> stripStreamRecord(Iterable<?> input) 
{
     return FluentIterable.from(input)
         .filter(o -> o instanceof StreamRecord)
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
new file mode 100644
index 00000000000..aebb41410fb
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import com.google.common.base.Function;
+import com.google.common.collect.FluentIterable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+class StreamRecordStripper {
+
+  @SuppressWarnings("Guava")
+  static <T> Iterable<WindowedValue<T>> 
stripStreamRecordFromWindowedValue(Iterable<Object> input) {
+    return FluentIterable.from(input)
+        .filter(
+            o ->
+                o instanceof StreamRecord && ((StreamRecord) o).getValue() 
instanceof WindowedValue)
+        .transform(
+            new Function<Object, WindowedValue<T>>() {
+              @Nullable
+              @Override
+              @SuppressWarnings({"unchecked", "rawtypes"})
+              public WindowedValue<T> apply(@Nullable Object o) {
+                if (o instanceof StreamRecord
+                    && ((StreamRecord) o).getValue() instanceof WindowedValue) 
{
+                  return (WindowedValue) ((StreamRecord) o).getValue();
+                }
+                throw new RuntimeException("unreachable");
+              }
+            });
+  }
+
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
new file mode 100644
index 00000000000..5a3717dc145
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static 
org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.joda.time.Duration.standardMinutes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.AppliedCombineFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link WindowDoFnOperator}.
+ */
+@RunWith(JUnit4.class)
+public class WindowDoFnOperatorTest {
+
+  @Test
+  public void testRestore() throws Exception {
+    // test harness
+    KeyedOneInputStreamOperatorTestHarness<ByteBuffer, 
WindowedValue<KeyedWorkItem<Long, Long>>,
+        WindowedValue<KV<Long, Long>>> testHarness = 
createTestHarness(getWindowDoFnOperator());
+    testHarness.open();
+
+    // process elements
+    IntervalWindow window = new IntervalWindow(new Instant(0), 
Duration.millis(10_000));
+    testHarness.processWatermark(0L);
+    testHarness.processElement(
+        
Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
+    testHarness.processElement(
+        
Item.builder().key(1L).timestamp(2L).value(20L).window(window).build().toStreamRecord());
+    testHarness.processElement(
+        
Item.builder().key(2L).timestamp(3L).value(77L).window(window).build().toStreamRecord());
+
+    // create snapshot
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+    testHarness.close();
+
+    // restore from the snapshot
+    testHarness = createTestHarness(getWindowDoFnOperator());
+    testHarness.initializeState(snapshot);
+    testHarness.open();
+
+    // close window
+    testHarness.processWatermark(10_000L);
+
+    Iterable<WindowedValue<KV<Long, Long>>> output = 
stripStreamRecordFromWindowedValue(
+        testHarness.getOutput());
+
+    assertEquals(2, Iterables.size(output));
+    assertThat(output, containsInAnyOrder(
+        WindowedValue.of(KV.of(1L, 120L), new Instant(9_999), window,
+            PaneInfo.createPane(true, true, ON_TIME)),
+        WindowedValue.of(KV.of(2L, 77L), new Instant(9_999), window,
+            PaneInfo.createPane(true, true, ON_TIME))
+        )
+    );
+    // cleanup
+    testHarness.close();
+  }
+
+  private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() {
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));
+
+    TupleTag<KV<Long, Long>> outputTag = new TupleTag<>("main-output");
+
+    SystemReduceFn<Long, Long, long[], Long, BoundedWindow> reduceFn = 
SystemReduceFn.combining(
+        VarLongCoder.of(),
+        AppliedCombineFn.withInputCoder(
+            Sum.ofLongs(),
+            CoderRegistry.createDefault(),
+            KvCoder.of(VarLongCoder.of(), VarLongCoder.of())
+        )
+    );
+
+    Coder<IntervalWindow> windowCoder = 
windowingStrategy.getWindowFn().windowCoder();
+    SingletonKeyedWorkItemCoder<Long, Long> workItemCoder = 
SingletonKeyedWorkItemCoder
+        .of(VarLongCoder.of(), VarLongCoder.of(), windowCoder);
+    FullWindowedValueCoder<SingletonKeyedWorkItem<Long, Long>> inputCoder = 
WindowedValue
+        .getFullCoder(workItemCoder, windowCoder);
+    FullWindowedValueCoder<KV<Long, Long>> outputCoder = WindowedValue
+        .getFullCoder(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()), 
windowCoder);
+
+    return new WindowDoFnOperator<Long, Long, Long>(
+        reduceFn,
+        "stepName",
+        (Coder) inputCoder,
+        outputTag,
+        emptyList(),
+        new MultiOutputOutputManagerFactory<>(outputTag, outputCoder),
+        windowingStrategy,
+        emptyMap(),
+        emptyList(),
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarLongCoder.of()
+    );
+  }
+
+  private KeyedOneInputStreamOperatorTestHarness<ByteBuffer,
+      WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>> 
createTestHarness(
+      WindowDoFnOperator<Long, Long, Long> windowDoFnOperator) throws 
Exception {
+    return new KeyedOneInputStreamOperatorTestHarness<>(
+        windowDoFnOperator,
+        (KeySelector<WindowedValue<KeyedWorkItem<Long, Long>>, ByteBuffer>) o 
-> {
+          try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            VarLongCoder.of().encode(o.getValue().key(), baos);
+            return ByteBuffer.wrap(baos.toByteArray());
+          }
+        },
+        new GenericTypeInfo<>(ByteBuffer.class)
+    );
+  }
+
+  private static class Item {
+
+    static ItemBuilder builder() {
+      return new ItemBuilder();
+    }
+
+    private long key;
+    private long value;
+    private long timestamp;
+    private IntervalWindow window;
+
+    StreamRecord<WindowedValue<KeyedWorkItem<Long, Long>>> toStreamRecord() {
+      WindowedValue<Long> item = WindowedValue.of(value, new 
Instant(timestamp), window, NO_FIRING);
+      WindowedValue<KeyedWorkItem<Long, Long>> keyedItem = WindowedValue
+          .of(new SingletonKeyedWorkItem<>(key, item), new Instant(timestamp), 
window, NO_FIRING);
+      return new StreamRecord<>(keyedItem);
+    }
+
+    private static final class ItemBuilder {
+
+      private long key;
+      private long value;
+      private long timestamp;
+      private IntervalWindow window;
+
+      ItemBuilder key(long key) {
+        this.key = key;
+        return this;
+      }
+
+      ItemBuilder value(long value) {
+        this.value = value;
+        return this;
+      }
+
+      ItemBuilder timestamp(long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+      }
+
+      ItemBuilder window(IntervalWindow window) {
+        this.window = window;
+        return this;
+      }
+
+      Item build() {
+        Item item = new Item();
+        item.key = this.key;
+        item.value = this.value;
+        item.window = this.window;
+        item.timestamp = this.timestamp;
+        return item;
+      }
+    }
+
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 84711)
    Time Spent: 1h 20m  (was: 1h 10m)

> Add checkpointing tests for DoFnOperator and WindowDoFnOperator 
> ----------------------------------------------------------------
>
>                 Key: BEAM-622
>                 URL: https://issues.apache.org/jira/browse/BEAM-622
>             Project: Beam
>          Issue Type: Test
>          Components: runner-flink
>    Affects Versions: 0.3.0-incubating
>            Reporter: Maximilian Michels
>            Assignee: Grzegorz Kołakowski
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Tests which test the correct snapshotting of these two operators are missing. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to