Repository: beam
Updated Branches:
  refs/heads/master cef31093f -> 99f93eb07


[BEAM-1036] Support for new State API in FlinkRunner (streaming)


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b56f4609
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b56f4609
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b56f4609

Branch: refs/heads/master
Commit: b56f460974a3119804040b08fe1c7b190ff3f356
Parents: cef3109
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Tue Jan 31 12:32:26 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Feb 3 13:36:11 2017 +0100

----------------------------------------------------------------------
 runners/flink/runner/pom.xml                    |   1 -
 .../FlinkStreamingTransformTranslators.java     | 137 ++++++++++++++-----
 .../wrappers/streaming/DoFnOperator.java        |  15 +-
 .../streaming/KvToByteBufferKeySelector.java    |  56 ++++++++
 .../wrappers/streaming/WindowDoFnOperator.java  |  13 +-
 .../beam/runners/flink/PipelineOptionsTest.java |   4 +-
 .../flink/streaming/DoFnOperatorTest.java       |   9 +-
 7 files changed, 182 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index fe058b5..207740a 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -88,7 +88,6 @@
                   
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
-                    org.apache.beam.sdk.testing.UsesStatefulParDo,
                     org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo,
                     org.apache.beam.sdk.testing.UsesAttemptedMetrics,

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 24ef987..1195c82 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -35,6 +35,7 @@ import 
org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.FlinkCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
@@ -307,19 +308,9 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+  private static void rejectTimers(DoFn<?, ?> doFn) {
     DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
 
-    if (signature.stateDeclarations().size() > 0) {
-      throw new UnsupportedOperationException(
-          String.format(
-              "Found %s annotations on %s, but %s cannot yet be used with 
state in the %s.",
-              DoFn.StateId.class.getSimpleName(),
-              doFn.getClass().getName(),
-              DoFn.class.getSimpleName(),
-              FlinkRunner.class.getSimpleName()));
-    }
-
     if (signature.timerDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
@@ -341,7 +332,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectStateAndTimers(doFn);
+      rejectTimers(doFn);
 
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
@@ -357,6 +348,20 @@ public class FlinkStreamingTransformTranslators {
       TypeInformation<WindowedValue<InputT>> inputTypeInfo =
           context.getTypeInfo(inputPCollection);
 
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+      Coder keyCoder = null;
+      boolean stateful = false;
+      DoFnSignature signature = 
DoFnSignatures.getSignature(transform.getFn().getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+        // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
+        // that it is also keyed
+        keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+        inputDataStream = inputDataStream.keyBy(new 
KvToByteBufferKeySelector(keyCoder));
+        stateful = true;
+      }
+
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
@@ -368,10 +373,8 @@ public class FlinkStreamingTransformTranslators {
                 windowingStrategy,
                 new HashMap<Integer, PCollectionView<?>>(), /* side-input 
mapping */
                 Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-                context.getPipelineOptions());
-
-        DataStream<WindowedValue<InputT>> inputDataStream =
-            context.getInputDataStream(context.getInput(transform));
+                context.getPipelineOptions(),
+                keyCoder);
 
         SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = 
inputDataStream
             .transform(transform.getName(), typeInfo, doFnOperator);
@@ -391,17 +394,39 @@ public class FlinkStreamingTransformTranslators {
                 windowingStrategy,
                 transformedSideInputs.f0,
                 sideInputs,
-                context.getPipelineOptions());
-
-        DataStream<WindowedValue<InputT>> inputDataStream =
-            context.getInputDataStream(context.getInput(transform));
-
-        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = 
inputDataStream
-            .connect(transformedSideInputs.f1.broadcast())
-            .transform(transform.getName(), typeInfo, doFnOperator);
-
+                context.getPipelineOptions(),
+                keyCoder);
+
+        SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream;
+        if (stateful) {
+          // we have to manually contruct the two-input transform because 
we're not
+          // allowed to have only one input keyed, normally.
+          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+          TwoInputTransformation<
+              WindowedValue<KV<?, InputT>>,
+              RawUnionValue,
+              WindowedValue<OutputT>> rawFlinkTransform = new 
TwoInputTransformation<>(
+              keyedStream.getTransformation(),
+              transformedSideInputs.f1.broadcast().getTransformation(),
+              transform.getName(),
+              (TwoInputStreamOperator) doFnOperator,
+              typeInfo,
+              keyedStream.getParallelism());
+
+          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), 
null);
+
+          outDataStream = new SingleOutputStreamOperator(
+                  keyedStream.getExecutionEnvironment(),
+                  rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+
+          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+        } else {
+          outDataStream = inputDataStream
+              .connect(transformedSideInputs.f1.broadcast())
+              .transform(transform.getName(), typeInfo, doFnOperator);
+        }
         context.setOutputDataStream(context.getOutput(transform), 
outDataStream);
-
       }
     }
   }
@@ -493,7 +518,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getFn();
-      rejectStateAndTimers(doFn);
+      rejectTimers(doFn);
 
       // we assume that the transformation does not change the windowing 
strategy.
       WindowingStrategy<?, ?> windowingStrategy =
@@ -514,6 +539,20 @@ public class FlinkStreamingTransformTranslators {
       TypeInformation<WindowedValue<InputT>> inputTypeInfo =
           context.getTypeInfo(inputPCollection);
 
+      DataStream<WindowedValue<InputT>> inputDataStream =
+          context.getInputDataStream(context.getInput(transform));
+      Coder keyCoder = null;
+      boolean stateful = false;
+      DoFnSignature signature = 
DoFnSignatures.getSignature(transform.getFn().getClass());
+      if (signature.stateDeclarations().size() > 0
+          || signature.timerDeclarations().size() > 0) {
+        // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
+        // that it is also keyed
+        keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder();
+        inputDataStream = inputDataStream.keyBy(new 
KvToByteBufferKeySelector(keyCoder));
+        stateful = true;
+      }
+
       if (sideInputs.isEmpty()) {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
@@ -525,16 +564,14 @@ public class FlinkStreamingTransformTranslators {
                 windowingStrategy,
                 new HashMap<Integer, PCollectionView<?>>(), /* side-input 
mapping */
                 Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-                context.getPipelineOptions());
+                context.getPipelineOptions(),
+                keyCoder);
 
         UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
         CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
             new CoderTypeInformation<>(outputUnionCoder);
 
-        DataStream<WindowedValue<InputT>> inputDataStream =
-            context.getInputDataStream(context.getInput(transform));
-
         unionOutputStream = inputDataStream
             .transform(transform.getName(), outputUnionTypeInformation, 
doFnOperator);
 
@@ -552,19 +589,43 @@ public class FlinkStreamingTransformTranslators {
                 windowingStrategy,
                 transformedSideInputs.f0,
                 sideInputs,
-                context.getPipelineOptions());
+                context.getPipelineOptions(),
+                keyCoder);
 
         UnionCoder outputUnionCoder = createUnionCoder(outputs);
 
         CoderTypeInformation<RawUnionValue> outputUnionTypeInformation =
             new CoderTypeInformation<>(outputUnionCoder);
 
-        DataStream<WindowedValue<InputT>> inputDataStream =
-            context.getInputDataStream(context.getInput(transform));
-
-        unionOutputStream = inputDataStream
-            .connect(transformedSideInputs.f1.broadcast())
-            .transform(transform.getName(), outputUnionTypeInformation, 
doFnOperator);
+        if (stateful) {
+          // we have to manually contruct the two-input transform because 
we're not
+          // allowed to have only one input keyed, normally.
+          KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream;
+          TwoInputTransformation<
+              WindowedValue<KV<?, InputT>>,
+              RawUnionValue,
+              WindowedValue<OutputT>> rawFlinkTransform = new 
TwoInputTransformation(
+              keyedStream.getTransformation(),
+              transformedSideInputs.f1.broadcast().getTransformation(),
+              transform.getName(),
+              (TwoInputStreamOperator) doFnOperator,
+              outputUnionTypeInformation,
+              keyedStream.getParallelism());
+
+          rawFlinkTransform.setStateKeyType(keyedStream.getKeyType());
+          rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), 
null);
+
+          unionOutputStream = new SingleOutputStreamOperator(
+                  keyedStream.getExecutionEnvironment(),
+                  rawFlinkTransform) {}; // we have to cheat around the ctor 
being protected
+
+          keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform);
+
+        } else {
+          unionOutputStream = inputDataStream
+              .connect(transformedSideInputs.f1.broadcast())
+              .transform(transform.getName(), outputUnionTypeInformation, 
doFnOperator);
+        }
       }
 
       SplitStream<RawUnionValue> splitStream = unionOutputStream

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index de0264a..a20a34c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -127,6 +127,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
restoredSideInputState;
 
+  protected transient FlinkStateInternals<?> stateInternals;
+
+  private final Coder<?> keyCoder;
+
   public DoFnOperator(
       DoFn<InputT, FnOutputT> doFn,
       TypeInformation<WindowedValue<InputT>> inputType,
@@ -136,7 +140,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       WindowingStrategy<?, ?> windowingStrategy,
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
-      PipelineOptions options) {
+      PipelineOptions options,
+      Coder<?> keyCoder) {
     this.doFn = doFn;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
@@ -156,6 +161,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         new ListStateDescriptor<>("pushed-back-values", inputType);
 
     setChainingStrategy(ChainingStrategy.ALWAYS);
+
+    this.keyCoder = keyCoder;
   }
 
   protected ExecutionContext.StepContext createStepContext() {
@@ -229,6 +236,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     outputManager = outputManagerFactory.create(output);
 
+    if (keyCoder != null) {
+      stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
+    }
+
     this.doFn = getDoFn();
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
 
@@ -521,7 +532,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
     @Override
     public StateInternals<?> stateInternals() {
-      throw new UnsupportedOperationException("Not supported for regular 
DoFns.");
+      return stateInternals;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
new file mode 100644
index 0000000..dce2e68
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
+
+import java.nio.ByteBuffer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+/**
+ * {@link KeySelector} that retrieves a key from a {@link KV}. This will return
+ * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. 
This ensures
+ * that all key comparisons/hashing happen on the encoded form.
+ */
+public class KvToByteBufferKeySelector<K, V>
+    implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>,
+    ResultTypeQueryable<ByteBuffer> {
+
+  private final Coder<K> keyCoder;
+
+  public KvToByteBufferKeySelector(Coder<K> keyCoder) {
+    this.keyCoder = keyCoder;
+  }
+
+  @Override
+  public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception {
+    K key = value.getValue().getKey();
+    byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
+    return ByteBuffer.wrap(keyBytes);
+  }
+
+  @Override
+  public TypeInformation<ByteBuffer> getProducedType() {
+    return new GenericTypeInfo<>(ByteBuffer.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 74614ad..64eb472 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -80,7 +80,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, 
WindowedValue<KV<K, OutputT>>>
     implements Triggerable {
 
-  private final Coder<K> keyCoder;
   private final TimerInternals.TimerDataCoder timerCoder;
 
   private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> 
watermarkTimers;
@@ -91,7 +90,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   private transient Multiset<Long> processingTimeTimerTimestamps;
   private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
 
-  private transient FlinkStateInternals<K> stateInternals;
   private transient FlinkTimerInternals timerInternals;
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> 
systemReduceFn;
@@ -116,11 +114,11 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         windowingStrategy,
         sideInputTagMapping,
         sideInputs,
-        options);
+        options,
+        keyCoder);
 
     this.systemReduceFn = systemReduceFn;
 
-    this.keyCoder = keyCoder;
     this.timerCoder =
         
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
   }
@@ -132,7 +130,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       public StateInternals<K> stateInternalsForKey(K key) {
         //this will implicitly be keyed by the key of the incoming
         // element or by the key of a firing timer
-        return stateInternals;
+        return (StateInternals<K>) stateInternals;
       }
     };
     TimerInternalsFactory<K> timerInternalsFactory = new 
TimerInternalsFactory<K>() {
@@ -192,7 +190,6 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     // ScheduledFutures are not checkpointed
     processingTimeTimerFutures = new HashMap<>();
 
-    stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
     timerInternals = new FlinkTimerInternals();
 
     // call super at the end because this will call getDoFn() which requires 
stateInternals
@@ -275,7 +272,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
         pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
             KeyedWorkItems.<K, InputT>timersWorkItem(
-                stateInternals.getKey(),
+                (K) stateInternals.getKey(),
                 Collections.singletonList(timer.f1))));
 
       } else {
@@ -313,7 +310,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
         pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow(
                 KeyedWorkItems.<K, InputT>timersWorkItem(
-                    stateInternals.getKey(),
+                    (K) stateInternals.getKey(),
                     Collections.singletonList(timer.f1))));
 
       } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 23bc6a2..d07861c 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -119,6 +119,7 @@ public class PipelineOptionsTest {
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(),
         Collections.<PCollectionView<?>>emptyList(),
+        null,
         null);
 
   }
@@ -138,7 +139,8 @@ public class PipelineOptionsTest {
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(),
         Collections.<PCollectionView<?>>emptyList(),
-        options);
+        options,
+        null);
 
     final byte[] serialized = SerializationUtils.serialize(doFnOperator);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index 113802d..3598d10 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -103,7 +103,8 @@ public class DoFnOperatorTest {
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, String> 
testHarness =
         new OneInputStreamOperatorTestHarness<>(doFnOperator);
@@ -147,7 +148,8 @@ public class DoFnOperatorTest {
         WindowingStrategy.globalDefault(),
         new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
         Collections.<PCollectionView<?>>emptyList(), /* side inputs */
-        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        null);
 
     OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> 
testHarness =
         new OneInputStreamOperatorTestHarness<>(doFnOperator);
@@ -207,7 +209,8 @@ public class DoFnOperatorTest {
         WindowingStrategy.globalDefault(),
         sideInputMapping, /* side-input mapping */
         ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */
-        PipelineOptionsFactory.as(FlinkPipelineOptions.class));
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        null);
 
     TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, 
String> testHarness =
         new TwoInputStreamOperatorTestHarness<>(doFnOperator);

Reply via email to