Repository: beam Updated Branches: refs/heads/master cef31093f -> 99f93eb07
[BEAM-1036] Support for new State API in FlinkRunner (streaming) Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b56f4609 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b56f4609 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b56f4609 Branch: refs/heads/master Commit: b56f460974a3119804040b08fe1c7b190ff3f356 Parents: cef3109 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Tue Jan 31 12:32:26 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Feb 3 13:36:11 2017 +0100 ---------------------------------------------------------------------- runners/flink/runner/pom.xml | 1 - .../FlinkStreamingTransformTranslators.java | 137 ++++++++++++++----- .../wrappers/streaming/DoFnOperator.java | 15 +- .../streaming/KvToByteBufferKeySelector.java | 56 ++++++++ .../wrappers/streaming/WindowDoFnOperator.java | 13 +- .../beam/runners/flink/PipelineOptionsTest.java | 4 +- .../flink/streaming/DoFnOperatorTest.java | 9 +- 7 files changed, 182 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index fe058b5..207740a 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -88,7 +88,6 @@ <groups>org.apache.beam.sdk.testing.RunnableOnService</groups> <excludedGroups> org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, - org.apache.beam.sdk.testing.UsesStatefulParDo, org.apache.beam.sdk.testing.UsesTimersInParDo, org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesAttemptedMetrics, http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 24ef987..1195c82 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.FlinkCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem; import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator; @@ -307,19 +308,9 @@ public class FlinkStreamingTransformTranslators { } } - private static void rejectStateAndTimers(DoFn<?, ?> doFn) { + private static void rejectTimers(DoFn<?, ?> doFn) { DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0) { - throw new UnsupportedOperationException( - String.format( - "Found %s annotations on %s, but %s cannot yet be used with state in the %s.", - DoFn.StateId.class.getSimpleName(), - doFn.getClass().getName(), - DoFn.class.getSimpleName(), - FlinkRunner.class.getSimpleName())); - } - if (signature.timerDeclarations().size() > 0) { throw new UnsupportedOperationException( String.format( @@ -341,7 +332,7 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); - rejectStateAndTimers(doFn); + rejectTimers(doFn); WindowingStrategy<?, ?> windowingStrategy = context.getOutput(transform).getWindowingStrategy(); @@ -357,6 +348,20 @@ public class FlinkStreamingTransformTranslators { TypeInformation<WindowedValue<InputT>> inputTypeInfo = context.getTypeInfo(inputPCollection); + DataStream<WindowedValue<InputT>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator = new DoFnOperator<>( @@ -368,10 +373,8 @@ public class FlinkStreamingTransformTranslators { windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions()); - - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); + context.getPipelineOptions(), + keyCoder); SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream .transform(transform.getName(), typeInfo, doFnOperator); @@ -391,17 +394,39 @@ public class FlinkStreamingTransformTranslators { windowingStrategy, transformedSideInputs.f0, sideInputs, - context.getPipelineOptions()); - - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - - SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream = inputDataStream - .connect(transformedSideInputs.f1.broadcast()) - .transform(transform.getName(), typeInfo, doFnOperator); - + context.getPipelineOptions(), + keyCoder); + + SingleOutputStreamOperator<WindowedValue<OutputT>> outDataStream; + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; + TwoInputTransformation< + WindowedValue<KV<?, InputT>>, + RawUnionValue, + WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation<>( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + typeInfo, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + outDataStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + } else { + outDataStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), typeInfo, doFnOperator); + } context.setOutputDataStream(context.getOutput(transform), outDataStream); - } } } @@ -493,7 +518,7 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); - rejectStateAndTimers(doFn); + rejectTimers(doFn); // we assume that the transformation does not change the windowing strategy. WindowingStrategy<?, ?> windowingStrategy = @@ -514,6 +539,20 @@ public class FlinkStreamingTransformTranslators { TypeInformation<WindowedValue<InputT>> inputTypeInfo = context.getTypeInfo(inputPCollection); + DataStream<WindowedValue<InputT>> inputDataStream = + context.getInputDataStream(context.getInput(transform)); + Coder keyCoder = null; + boolean stateful = false; + DoFnSignature signature = DoFnSignatures.getSignature(transform.getFn().getClass()); + if (signature.stateDeclarations().size() > 0 + || signature.timerDeclarations().size() > 0) { + // Based on the fact that the signature is stateful, DoFnSignatures ensures + // that it is also keyed + keyCoder = ((KvCoder) inputPCollection.getCoder()).getKeyCoder(); + inputDataStream = inputDataStream.keyBy(new KvToByteBufferKeySelector(keyCoder)); + stateful = true; + } + if (sideInputs.isEmpty()) { DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator = new DoFnOperator<>( @@ -525,16 +564,14 @@ public class FlinkStreamingTransformTranslators { windowingStrategy, new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions()); + context.getPipelineOptions(), + keyCoder); UnionCoder outputUnionCoder = createUnionCoder(outputs); CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = new CoderTypeInformation<>(outputUnionCoder); - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - unionOutputStream = inputDataStream .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); @@ -552,19 +589,43 @@ public class FlinkStreamingTransformTranslators { windowingStrategy, transformedSideInputs.f0, sideInputs, - context.getPipelineOptions()); + context.getPipelineOptions(), + keyCoder); UnionCoder outputUnionCoder = createUnionCoder(outputs); CoderTypeInformation<RawUnionValue> outputUnionTypeInformation = new CoderTypeInformation<>(outputUnionCoder); - DataStream<WindowedValue<InputT>> inputDataStream = - context.getInputDataStream(context.getInput(transform)); - - unionOutputStream = inputDataStream - .connect(transformedSideInputs.f1.broadcast()) - .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + if (stateful) { + // we have to manually contruct the two-input transform because we're not + // allowed to have only one input keyed, normally. + KeyedStream keyedStream = (KeyedStream<?, InputT>) inputDataStream; + TwoInputTransformation< + WindowedValue<KV<?, InputT>>, + RawUnionValue, + WindowedValue<OutputT>> rawFlinkTransform = new TwoInputTransformation( + keyedStream.getTransformation(), + transformedSideInputs.f1.broadcast().getTransformation(), + transform.getName(), + (TwoInputStreamOperator) doFnOperator, + outputUnionTypeInformation, + keyedStream.getParallelism()); + + rawFlinkTransform.setStateKeyType(keyedStream.getKeyType()); + rawFlinkTransform.setStateKeySelectors(keyedStream.getKeySelector(), null); + + unionOutputStream = new SingleOutputStreamOperator( + keyedStream.getExecutionEnvironment(), + rawFlinkTransform) {}; // we have to cheat around the ctor being protected + + keyedStream.getExecutionEnvironment().addOperator(rawFlinkTransform); + + } else { + unionOutputStream = inputDataStream + .connect(transformedSideInputs.f1.broadcast()) + .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); + } } SplitStream<RawUnionValue> splitStream = unionOutputStream http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index de0264a..a20a34c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -127,6 +127,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState; + protected transient FlinkStateInternals<?> stateInternals; + + private final Coder<?> keyCoder; + public DoFnOperator( DoFn<InputT, FnOutputT> doFn, TypeInformation<WindowedValue<InputT>> inputType, @@ -136,7 +140,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> sideInputTagMapping, Collection<PCollectionView<?>> sideInputs, - PipelineOptions options) { + PipelineOptions options, + Coder<?> keyCoder) { this.doFn = doFn; this.mainOutputTag = mainOutputTag; this.sideOutputTags = sideOutputTags; @@ -156,6 +161,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> new ListStateDescriptor<>("pushed-back-values", inputType); setChainingStrategy(ChainingStrategy.ALWAYS); + + this.keyCoder = keyCoder; } protected ExecutionContext.StepContext createStepContext() { @@ -229,6 +236,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> outputManager = outputManagerFactory.create(output); + if (keyCoder != null) { + stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder); + } + this.doFn = getDoFn(); doFnInvoker = DoFnInvokers.invokerFor(doFn); @@ -521,7 +532,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> @Override public StateInternals<?> stateInternals() { - throw new UnsupportedOperationException("Not supported for regular DoFns."); + return stateInternals; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java new file mode 100644 index 0000000..dce2e68 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/KvToByteBufferKeySelector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import java.nio.ByteBuffer; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +/** + * {@link KeySelector} that retrieves a key from a {@link KV}. This will return + * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures + * that all key comparisons/hashing happen on the encoded form. + */ +public class KvToByteBufferKeySelector<K, V> + implements KeySelector<WindowedValue<KV<K, V>>, ByteBuffer>, + ResultTypeQueryable<ByteBuffer> { + + private final Coder<K> keyCoder; + + public KvToByteBufferKeySelector(Coder<K> keyCoder) { + this.keyCoder = keyCoder; + } + + @Override + public ByteBuffer getKey(WindowedValue<KV<K, V>> value) throws Exception { + K key = value.getValue().getKey(); + byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key); + return ByteBuffer.wrap(keyBytes); + } + + @Override + public TypeInformation<ByteBuffer> getProducedType() { + return new GenericTypeInfo<>(ByteBuffer.class); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 74614ad..64eb472 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -80,7 +80,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> implements Triggerable { - private final Coder<K> keyCoder; private final TimerInternals.TimerDataCoder timerCoder; private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimers; @@ -91,7 +90,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> private transient Multiset<Long> processingTimeTimerTimestamps; private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; - private transient FlinkStateInternals<K> stateInternals; private transient FlinkTimerInternals timerInternals; private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; @@ -116,11 +114,11 @@ public class WindowDoFnOperator<K, InputT, OutputT> windowingStrategy, sideInputTagMapping, sideInputs, - options); + options, + keyCoder); this.systemReduceFn = systemReduceFn; - this.keyCoder = keyCoder; this.timerCoder = TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder()); } @@ -132,7 +130,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> public StateInternals<K> stateInternalsForKey(K key) { //this will implicitly be keyed by the key of the incoming // element or by the key of a firing timer - return stateInternals; + return (StateInternals<K>) stateInternals; } }; TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() { @@ -192,7 +190,6 @@ public class WindowDoFnOperator<K, InputT, OutputT> // ScheduledFutures are not checkpointed processingTimeTimerFutures = new HashMap<>(); - stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder); timerInternals = new FlinkTimerInternals(); // call super at the end because this will call getDoFn() which requires stateInternals @@ -275,7 +272,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<K, InputT>timersWorkItem( - stateInternals.getKey(), + (K) stateInternals.getKey(), Collections.singletonList(timer.f1)))); } else { @@ -313,7 +310,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( KeyedWorkItems.<K, InputT>timersWorkItem( - stateInternals.getKey(), + (K) stateInternals.getKey(), Collections.singletonList(timer.f1)))); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 23bc6a2..d07861c 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -119,6 +119,7 @@ public class PipelineOptionsTest { WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), Collections.<PCollectionView<?>>emptyList(), + null, null); } @@ -138,7 +139,8 @@ public class PipelineOptionsTest { WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), Collections.<PCollectionView<?>>emptyList(), - options); + options, + null); final byte[] serialized = SerializationUtils.serialize(doFnOperator); http://git-wip-us.apache.org/repos/asf/beam/blob/b56f4609/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 113802d..3598d10 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -103,7 +103,8 @@ public class DoFnOperatorTest { WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - PipelineOptionsFactory.as(FlinkPipelineOptions.class)); + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + null); OneInputStreamOperatorTestHarness<WindowedValue<String>, String> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); @@ -147,7 +148,8 @@ public class DoFnOperatorTest { WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - PipelineOptionsFactory.as(FlinkPipelineOptions.class)); + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + null); OneInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue> testHarness = new OneInputStreamOperatorTestHarness<>(doFnOperator); @@ -207,7 +209,8 @@ public class DoFnOperatorTest { WindowingStrategy.globalDefault(), sideInputMapping, /* side-input mapping */ ImmutableList.<PCollectionView<?>>of(view1, view2), /* side inputs */ - PipelineOptionsFactory.as(FlinkPipelineOptions.class)); + PipelineOptionsFactory.as(FlinkPipelineOptions.class), + null); TwoInputStreamOperatorTestHarness<WindowedValue<String>, RawUnionValue, String> testHarness = new TwoInputStreamOperatorTestHarness<>(doFnOperator);