[ 
https://issues.apache.org/jira/browse/BEAM-2918?focusedWorklogId=158201&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-158201
 ]

ASF GitHub Bot logged work on BEAM-2918:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Oct/18 15:59
            Start Date: 24/Oct/18 15:59
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6726: [BEAM-2918] Add state 
support for streaming in portable FlinkRunner
URL: https://github.com/apache/beam/pull/6726
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 2b276f404c7..a3355c67e59 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -33,6 +33,7 @@
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -52,6 +53,7 @@
 import 
org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
@@ -550,6 +552,29 @@ private void translateStreamingImpulse(
       }
     }
 
+    final Coder<WindowedValue<InputT>> windowedInputCoder =
+        instantiateCoder(inputPCollectionId, components);
+
+    Coder keyCoder = null;
+    KeySelector<WindowedValue<InputT>, ?> keySelector = null;
+    final boolean stateful = stagePayload.getUserStatesCount() > 0;
+    if (stateful) {
+      // Stateful stages are only allowed of KV input
+      Coder valueCoder =
+          ((WindowedValue.FullWindowedValueCoder) 
windowedInputCoder).getValueCoder();
+      if (!(valueCoder instanceof KvCoder)) {
+        throw new IllegalStateException(
+            String.format(
+                Locale.ENGLISH,
+                "The element coder for stateful DoFn '%s' must be KvCoder but 
is: %s",
+                inputPCollectionId,
+                valueCoder.getClass().getSimpleName()));
+      }
+      keyCoder = ((KvCoder) valueCoder).getKeyCoder();
+      keySelector = new KvToByteBufferKeySelector(keyCoder);
+      inputDataStream = inputDataStream.keyBy(keySelector);
+    }
+
     DoFnOperator.MultiOutputOutputManagerFactory<OutputT> outputManagerFactory 
=
         new DoFnOperator.MultiOutputOutputManagerFactory<>(
             mainOutputTag, tagsToOutputTags, tagsToCoders, tagsToIds);
@@ -557,7 +582,7 @@ private void translateStreamingImpulse(
     DoFnOperator<InputT, OutputT> doFnOperator =
         new ExecutableStageDoFnOperator<>(
             transform.getUniqueName(),
-            instantiateCoder(inputPCollectionId, components),
+            windowedInputCoder,
             null,
             Collections.emptyMap(),
             mainOutputTag,
@@ -570,7 +595,9 @@ private void translateStreamingImpulse(
             stagePayload,
             context.getJobInfo(),
             FlinkExecutableStageContext.factory(context.getPipelineOptions()),
-            collectionIdToTupleTag);
+            collectionIdToTupleTag,
+            keyCoder,
+            keySelector);
 
     if (transformedSideInputs.unionTagToView.isEmpty()) {
       outputStream =
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 899732ae75d..b40df590e49 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -465,7 +465,7 @@ public RawUnionValue map(T o) throws Exception {
       DataStream<WindowedValue<InputT>> inputDataStream = 
context.getInputDataStream(input);
 
       Coder keyCoder = null;
-      KeySelector keySelector = null;
+      KeySelector<WindowedValue<InputT>, ?> keySelector = null;
       boolean stateful = false;
       DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
       if (signature.stateDeclarations().size() > 0 || 
signature.timerDeclarations().size() > 0) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 2135d5a8285..4081ed2c31e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -22,13 +22,22 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
@@ -43,6 +52,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -52,20 +62,22 @@
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * ExecutableStageDoFnOperator basic functional implementation without side 
inputs and user state.
- * SDK harness interaction code adopted from {@link
- * 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction}.
 TODO: Evaluate
- * reuse All operators in the non-portable streaming translation are based on 
{@link DoFnOperator}.
- * This implies dependency on {@link DoFnRunner}, which is not required for 
portable pipeline. TODO:
- * Multiple element bundle execution The operator (like old non-portable 
runner) executes every
- * element as separate bundle, which will be even more expensive with SDK 
harness container.
- * Refactor for above should be looked into once streaming side inputs (and 
push back) take shape.
+ * This operator is the streaming equivalent of the {@link
+ * 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction}.
 It sends all
+ * received elements to the SDK harness and emits the received back elements 
to the downstream
+ * operators. It also takes care of handling side inputs and state.
+ *
+ * <p>TODO Integrate support for timers
+ *
+ * <p>TODO Integrate support for progress updates and metrics
  */
 public class ExecutableStageDoFnOperator<InputT, OutputT> extends 
DoFnOperator<InputT, OutputT> {
 
@@ -100,7 +112,9 @@ public ExecutableStageDoFnOperator(
       RunnerApi.ExecutableStagePayload payload,
       JobInfo jobInfo,
       FlinkExecutableStageContext.Factory contextFactory,
-      Map<String, TupleTag<?>> outputMap) {
+      Map<String, TupleTag<?>> outputMap,
+      Coder keyCoder,
+      KeySelector<WindowedValue<InputT>, ?> keySelector) {
     super(
         new NoOpDoFn(),
         stepName,
@@ -114,8 +128,8 @@ public ExecutableStageDoFnOperator(
         sideInputTagMapping,
         sideInputs,
         options,
-        null /*keyCoder*/,
-        null /* key selector */);
+        keyCoder,
+        keySelector);
     this.payload = payload;
     this.jobInfo = jobInfo;
     this.contextFactory = contextFactory;
@@ -135,14 +149,15 @@ public void open() throws Exception {
     // ownership of the higher level "factories" explicit? Do we care?
     stageContext = contextFactory.get(jobInfo);
 
-    stateRequestHandler = getStateRequestHandler(executableStage);
     stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
+    stateRequestHandler = getStateRequestHandler(executableStage);
     progressHandler = BundleProgressHandler.unsupported();
     outputQueue = new LinkedBlockingQueue<>();
   }
 
   private StateRequestHandler getStateRequestHandler(ExecutableStage 
executableStage) {
 
+    final StateRequestHandler sideInputStateHandler;
     if (executableStage.getSideInputs().size() > 0) {
       checkNotNull(super.sideInputHandler);
       StateRequestHandlers.SideInputHandlerFactory sideInputHandlerFactory =
@@ -150,16 +165,115 @@ private StateRequestHandler 
getStateRequestHandler(ExecutableStage executableSta
               FlinkStreamingSideInputHandlerFactory.forStage(
                   executableStage, sideInputIds, super.sideInputHandler));
       try {
-        return StateRequestHandlers.forSideInputHandlerFactory(
-            ProcessBundleDescriptors.getSideInputs(executableStage), 
sideInputHandlerFactory);
+        sideInputStateHandler =
+            StateRequestHandlers.forSideInputHandlerFactory(
+                ProcessBundleDescriptors.getSideInputs(executableStage), 
sideInputHandlerFactory);
       } catch (IOException e) {
-        throw new RuntimeException(e);
+        throw new RuntimeException("Failed to initialize SideInputHandler", e);
+      }
+    } else {
+      sideInputStateHandler = StateRequestHandler.unsupported();
+    }
+
+    final StateRequestHandler userStateRequestHandler;
+    if (executableStage.getUserStates().size() > 0) {
+      if (keyedStateInternals == null) {
+        throw new IllegalStateException("Input must be keyed when user state 
is used");
       }
+      userStateRequestHandler =
+          StateRequestHandlers.forBagUserStateHandlerFactory(
+              stageBundleFactory.getProcessBundleDescriptor(),
+              new BagUserStateFactory(keyedStateInternals, 
getKeyedStateBackend()));
     } else {
-      return StateRequestHandler.unsupported();
+      userStateRequestHandler = StateRequestHandler.unsupported();
+    }
+
+    EnumMap<TypeCase, StateRequestHandler> handlerMap = new 
EnumMap<>(TypeCase.class);
+    handlerMap.put(TypeCase.MULTIMAP_SIDE_INPUT, sideInputStateHandler);
+    handlerMap.put(TypeCase.BAG_USER_STATE, userStateRequestHandler);
+
+    return StateRequestHandlers.delegateBasedUponType(handlerMap);
+  }
+
+  private static class BagUserStateFactory
+      implements StateRequestHandlers.BagUserStateHandlerFactory {
+
+    private final StateInternals stateInternals;
+    private final KeyedStateBackend<ByteBuffer> keyedStateBackend;
+
+    private BagUserStateFactory(
+        StateInternals stateInternals, KeyedStateBackend<ByteBuffer> 
keyedStateBackend) {
+
+      this.stateInternals = stateInternals;
+      this.keyedStateBackend = keyedStateBackend;
+    }
+
+    @Override
+    public <K, V, W extends BoundedWindow>
+        StateRequestHandlers.BagUserStateHandler<K, V, W> forUserState(
+            String pTransformId,
+            String userStateId,
+            Coder<K> keyCoder,
+            Coder<V> valueCoder,
+            Coder<W> windowCoder) {
+      return new StateRequestHandlers.BagUserStateHandler<K, V, W>() {
+        @Override
+        public Iterable<V> get(K key, W window) {
+          prepareStateBackend(key, keyCoder);
+          StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+          BagState<V> bagState =
+              stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
+          return bagState.read();
+        }
+
+        @Override
+        public void append(K key, W window, Iterator<V> values) {
+          prepareStateBackend(key, keyCoder);
+          StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+          BagState<V> bagState =
+              stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
+          while (values.hasNext()) {
+            bagState.add(values.next());
+          }
+        }
+
+        @Override
+        public void clear(K key, W window) {
+          prepareStateBackend(key, keyCoder);
+          StateNamespace namespace = StateNamespaces.window(windowCoder, 
window);
+          BagState<V> bagState =
+              stateInternals.state(namespace, StateTags.bag(userStateId, 
valueCoder));
+          bagState.clear();
+        }
+
+        private void prepareStateBackend(K key, Coder<K> keyCoder) {
+          ByteArrayOutputStream baos = new ByteArrayOutputStream();
+          try {
+            keyCoder.encode(key, baos);
+          } catch (IOException e) {
+            throw new RuntimeException("Failed to encode key for Flink state 
backend", e);
+          }
+          keyedStateBackend.setCurrentKey(ByteBuffer.wrap(baos.toByteArray()));
+        }
+      };
     }
   }
 
+  @Override
+  public void setKeyContextElement1(StreamRecord record) throws Exception {
+    // Note: This is only relevant when we have a stateful DoFn.
+    // We want to control the key of the state backend ourselves and
+    // we must avoid any concurrent setting of the current active key.
+    // By overwriting this, we also prevent unnecessary serialization
+    // as the key has to be encoded as a byte array.
+  }
+
+  @Override
+  public void setCurrentKey(Object key) {
+    throw new UnsupportedOperationException(
+        "Current key for state backend can only be set by state requests from 
SDK workers.");
+  }
+
   @Override
   public void dispose() throws Exception {
     // Remove the reference to stageContext and make stageContext available 
for garbage collection.
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
new file mode 100644
index 00000000000..bd133d3c73e
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests the State server integration of {@link
+ * 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}.
+ */
+@RunWith(Parameterized.class)
+public class PortableStateExecutionTest implements Serializable {
+
+  @Parameters
+  public static Object[] data() {
+    return new Object[] {true};
+  }
+
+  @Parameter public boolean isStreaming;
+
+  private transient ListeningExecutorService flinkJobExecutor;
+
+  @Before
+  public void setup() {
+    flinkJobExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  }
+
+  @After
+  public void tearDown() {
+    flinkJobExecutor.shutdown();
+  }
+
+  private static final Map<String, Integer> stateValues = new HashMap<>();
+
+  @Before
+  public void before() {
+    stateValues.clear();
+  }
+
+  // Special values which clear / write out state
+  private static final int CLEAR_STATE = -1;
+  private static final int WRITE_STATE_TO_MAP = -2;
+
+  @Test
+  public void testExecution() throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(CrashingRunner.class);
+    options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
+    options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
+    options
+        .as(PortablePipelineOptions.class)
+        .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
+    Pipeline p = Pipeline.create(options);
+    p.apply(Impulse.create())
+        .apply(
+            ParDo.of(
+                new DoFn<byte[], KV<String, Integer>>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctx) {
+                    // Values == -1 will clear the state
+                    ctx.output(KV.of("clearedState", 1));
+                    ctx.output(KV.of("clearedState", CLEAR_STATE));
+                    // values >= 1 will be added on top of each other
+                    ctx.output(KV.of("bla1", 42));
+                    ctx.output(KV.of("bla", 23));
+                    ctx.output(KV.of("bla2", 64));
+                    ctx.output(KV.of("bla", 1));
+                    ctx.output(KV.of("bla", 1));
+                    // values == -2 will write the state to a map
+                    ctx.output(KV.of("bla", WRITE_STATE_TO_MAP));
+                    ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP));
+                    ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP));
+                    ctx.output(KV.of("clearedState", -2));
+                  }
+                }))
+        .apply(
+            "statefulDoFn",
+            ParDo.of(
+                new DoFn<KV<String, Integer>, String>() {
+                  @StateId("valueState")
+                  private final StateSpec<ValueState<Integer>> valueStateSpec =
+                      StateSpecs.value(VarIntCoder.of());
+
+                  @ProcessElement
+                  public void process(
+                      ProcessContext ctx, @StateId("valueState") 
ValueState<Integer> valueState) {
+                    Integer value = ctx.element().getValue();
+                    if (value == null) {
+                      throw new IllegalStateException();
+                    }
+                    switch (value) {
+                      case CLEAR_STATE:
+                        valueState.clear();
+                        break;
+                      case WRITE_STATE_TO_MAP:
+                        stateValues.put(ctx.element().getKey(), 
valueState.read());
+                        break;
+                      default:
+                        Integer currentState = valueState.read();
+                        if (currentState == null) {
+                          currentState = value;
+                        } else {
+                          currentState += value;
+                        }
+                        valueState.write(currentState);
+                    }
+                  }
+                }));
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+
+    FlinkJobInvocation jobInvocation =
+        FlinkJobInvocation.create(
+            "id",
+            "none",
+            flinkJobExecutor,
+            pipelineProto,
+            options.as(FlinkPipelineOptions.class),
+            Collections.emptyList());
+
+    jobInvocation.start();
+    long timeout = System.currentTimeMillis() + 60 * 1000;
+    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() 
< timeout) {
+      Thread.sleep(1000);
+    }
+    assertThat(jobInvocation.getState(), is(Enum.DONE));
+
+    Map<String, Integer> expected = new HashMap<>();
+    expected.put("bla", 25);
+    expected.put("bla1", 42);
+    expected.put("bla2", 64);
+    expected.put("clearedState", null);
+
+    assertThat(stateValues, equalTo(expected));
+  }
+}
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 7888b4dd23f..893073195c3 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -43,6 +43,7 @@
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
@@ -240,6 +241,12 @@ public void close() throws Exception {
             };
           }
 
+          @Override
+          public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor
+              getProcessBundleDescriptor() {
+            return null;
+          }
+
           @Override
           public void close() {}
         };
@@ -342,7 +349,9 @@ public void testSerialization() {
             stagePayload,
             jobInfo,
             FlinkExecutableStageContext.factory(options),
-            createOutputMap(mainOutput, ImmutableList.of(additionalOutput)));
+            createOutputMap(mainOutput, ImmutableList.of(additionalOutput)),
+            null,
+            null);
 
     ExecutableStageDoFnOperator<Integer, Integer> clone = 
SerializationUtils.clone(operator);
     assertNotNull(clone);
@@ -379,7 +388,9 @@ public void testSerialization() {
             stagePayload,
             jobInfo,
             contextFactory,
-            createOutputMap(mainOutput, additionalOutputs));
+            createOutputMap(mainOutput, additionalOutputs),
+            null,
+            null);
 
     Whitebox.setInternalState(operator, "stateRequestHandler", 
stateRequestHandler);
     return operator;
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index f6e4ba26b35..bbb5302ae3f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -33,6 +33,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
 import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
 import org.apache.beam.runners.fnexecution.control.RemoteBundle;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
@@ -186,6 +187,12 @@ public void close() throws Exception {
             };
           }
 
+          @Override
+          public ProcessBundleDescriptors.ExecutableProcessBundleDescriptor
+              getProcessBundleDescriptor() {
+            return null;
+          }
+
           @Override
           public void close() throws Exception {}
         };
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 6cbff13c9e5..a4a33756bf5 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -248,6 +248,11 @@ public RemoteBundle getBundle(
       return processor.newBundle(outputReceivers.build(), stateRequestHandler, 
progressHandler);
     }
 
+    @Override
+    public ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
+      return processBundleDescriptor;
+    }
+
     @Override
     public void close() throws Exception {
       // Clear reference to encourage cache eviction. Values are weakly 
referenced.
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index a6b4fbe10d0..cf15a0cdfe1 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -167,6 +167,11 @@ public RemoteBundle getBundle(
       return processor.newBundle(outputReceivers, stateRequestHandler, 
progressHandler);
     }
 
+    @Override
+    public ExecutableProcessBundleDescriptor getProcessBundleDescriptor() {
+      return descriptor;
+    }
+
     @Override
     public void close() {}
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
index 3b8b8d21b7a..7086e58b69d 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
@@ -35,4 +35,6 @@ RemoteBundle getBundle(
       StateRequestHandler stateRequestHandler,
       BundleProgressHandler progressHandler)
       throws Exception;
+
+  ProcessBundleDescriptors.ExecutableProcessBundleDescriptor 
getProcessBundleDescriptor();
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
index 3f3ad2da3e2..fb1748bbad0 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/StateFetchingIteratorsTest.java
@@ -71,7 +71,6 @@ private void testFetch(ByteString... expected) {
       BeamFnStateClient fakeStateClient =
           (requestBuilder, response) -> {
             ByteString continuationToken = 
requestBuilder.getGet().getContinuationToken();
-            StateGetResponse.Builder builder = StateGetResponse.newBuilder();
 
             int requestedPosition = 0; // Default position is 0
             if (!ByteString.EMPTY.equals(continuationToken)) {
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index b9154073c01..2d3ae54d781 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -107,7 +107,10 @@ def test_no_subtransform_composite(self):
       raise unittest.SkipTest("BEAM-4781")
 
     def test_pardo_state_only(self):
-      raise unittest.SkipTest("BEAM-2918 - User state not yet supported.")
+      if streaming:
+        super(FlinkRunnerTest, self).test_pardo_state_only()
+      else:
+        raise unittest.SkipTest("BEAM-2918 - User state not yet supported.")
 
     def test_pardo_timers(self):
       raise unittest.SkipTest("BEAM-4681 - User timers not yet supported.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 158201)
    Time Spent: 11h 50m  (was: 11h 40m)

> Flink support for portable user state
> -------------------------------------
>
>                 Key: BEAM-2918
>                 URL: https://issues.apache.org/jira/browse/BEAM-2918
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Henning Rohde
>            Assignee: Maximilian Michels
>            Priority: Minor
>              Labels: portability
>          Time Spent: 11h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to