[ https://issues.apache.org/jira/browse/BEAM-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837591#comment-16837591 ]
Jan Lukavský commented on BEAM-6813: ------------------------------------ Probably related to https://issues.apache.org/jira/browse/BEAM-7269. [~SteveNiemitz] can you please confirm that either of the coders is missing hashCode and/or equals? > Issues with state + timers in java Direct Runner > ------------------------------------------------- > > Key: BEAM-6813 > URL: https://issues.apache.org/jira/browse/BEAM-6813 > Project: Beam > Issue Type: Bug > Components: runner-direct > Affects Versions: 2.11.0 > Reporter: Steve Niemitz > Priority: Major > > I was experimenting with a stateful DoFn with timers, and ran into a weird > bug where a state cell I was writing to would come back as null when I read > it inside a timer callback. > I've attached the code below [1] (please excuse the scala ;) ). > After I dug into this a little bit, I found that the state's value was > present in the `underlying` table in CopyOnAccessMemoryStateTable [2], but > not set in the `stateTable` itself on the instance. [3] Based on my very > rudimentary understanding of how this works in the direct runner, it seems > like commit() is not being called on the state table before the timer is > firing? > > [1] > {code:java} > private final class AggregatorDoFn[K, V, Acc, Out]( > combiner: CombineFn[V, Acc, Out], > keyCoder: Coder[K], > accumulatorCoder: Coder[Acc] > ) extends DoFn[KV[K, V], KV[K, Out]] { > @StateId(KeyId) > private final val keySpec = StateSpecs.value(keyCoder) > @StateId(AggregationId) > private final val stateSpec = StateSpecs.combining(accumulatorCoder, > combiner) > @StateId("numElements") > private final val numElementsSpec = StateSpecs.combining(Sum.ofLongs()) > @TimerId(FlushTimerId) > private final val flushTimerSpec = > TimerSpecs.timer(TimeDomain.PROCESSING_TIME) > @ProcessElement > def processElement( > @StateId(KeyId) key: ValueState[K], > @StateId(AggregationId) state: CombiningState[V, Acc, Out], > @StateId("numElements") numElements: CombiningState[JLong, _, JLong], > @TimerId(FlushTimerId) flushTimer: Timer, > @Element element: KV[K, V], > window: BoundedWindow > ): Unit = { > key.write(element.getKey) > state.add(element.getValue) > numElements.add(1L) > if (numElements.read() == 1) { > flushTimer > .offset(Duration.standardSeconds(10)) > .setRelative() > } > } > @OnTimer(FlushTimerId) > def onFlushTimer( > @StateId(KeyId) key: ValueState[K], > @StateId(AggregationId) state: CombiningState[V, _, Out], > @StateId("numElements") numElements: CombiningState[JLong, _, JLong], > output: OutputReceiver[KV[K, Out]] > ): Unit = { > if (numElements.read() > 0) { > val k = key.read() > output.output( > KV.of(k, state.read()) > ) > } > numElements.clear() > } > }{code} > [2] > [https://imgur.com/a/xvPR5nd] > [3] > [https://imgur.com/a/jznMdaQ] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)