I filed https://issues.apache.org/jira/browse/BEAM-5930.
On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik <lc...@google.com> wrote: > That looks like a bug in the FnApiDoFnRunner.java > > The FnApiStateAccessor is given a callback to get the current element and > it is not handling the case where the current element is a timer. > > callback: > https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212 > where the current "element" gets set: > https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220 > where the current "timer" gets set: > https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237 > > The easiest fix would be to have the callback return the first non null > from currentElement/currentTimer but longer term I think we'll want a > different solution. Alternatively, we could collapse currentElement and > currentTimer to be currentElementOrTimer which would solve the accessor > issue. > > On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels <m...@apache.org> wrote: > >> Hi, >> >> I have a question regarding user state during timer callback in the >> FnApiDoFnRunner (Java SDK Harness). >> >> I've started implementing Timers for the portable Flink Runner. I can >> register a timer via the timer output collection and fire the timer via the >> timer input of the SDK Harness. But when I try to access state in the Timer >> callback, I get the exception below. >> >> Is this a bug or if not, how is the timer's key supposed to be set? I >> assume that it should be set from the timer element which contains the key. >> >> Thanks, >> Max >> >> >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.RuntimeException: Error received from SDK harness for instruction >> 72: java.util.concurrent.ExecutionException: java.lang.NullPointerException >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> at >> org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90) >> at >> org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185) >> at >> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161) >> at >> org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.NullPointerException >> at >> org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694) >> at >> org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451) >> at >> org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244) >> at >> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487) >> at >> org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671) >> at >> StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown Source) >> at >> org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187) >> at >> org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244) >> at >> org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) >> at >> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) >> at >> org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> at >> org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) >> >> >>