[ 
https://issues.apache.org/jira/browse/BEAM-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16805506#comment-16805506
 ] 

Steve Niemitz commented on BEAM-6813:
-------------------------------------

I believe this was using a 4 hour fixed window without any triggering.

> Issues with state + timers in java Direct Runner 
> -------------------------------------------------
>
>                 Key: BEAM-6813
>                 URL: https://issues.apache.org/jira/browse/BEAM-6813
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.11.0
>            Reporter: Steve Niemitz
>            Priority: Major
>
> I was experimenting with a stateful DoFn with timers, and ran into a weird 
> bug where a state cell I was writing to would come back as null when I read 
> it inside a timer callback.
> I've attached the code below [1] (please excuse the scala ;) ).
> After I dug into this a little bit, I found that the state's value was 
> present in the `underlying` table in CopyOnAccessMemoryStateTable [2], but 
> not set in the `stateTable` itself on the instance. [3]   Based on my very 
> rudimentary understanding of how this works in the direct runner, it seems 
> like commit() is not being called on the state table before the timer is 
> firing?
>   
>  [1]
> {code:java}
> private final class AggregatorDoFn[K, V, Acc, Out](
>   combiner: CombineFn[V, Acc, Out],
>   keyCoder: Coder[K],
>   accumulatorCoder: Coder[Acc]
> ) extends DoFn[KV[K, V], KV[K, Out]] {
>   @StateId(KeyId)
>   private final val keySpec = StateSpecs.value(keyCoder)
>   @StateId(AggregationId)
>   private final val stateSpec = StateSpecs.combining(accumulatorCoder, 
> combiner)
>   @StateId("numElements")
>   private final val numElementsSpec = StateSpecs.combining(Sum.ofLongs())
>   @TimerId(FlushTimerId)
>   private final val flushTimerSpec = 
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME)
>   @ProcessElement
>   def processElement(
>     @StateId(KeyId) key: ValueState[K],
>     @StateId(AggregationId) state: CombiningState[V, Acc, Out],
>     @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
>     @TimerId(FlushTimerId) flushTimer: Timer,
>     @Element element: KV[K, V],
>     window: BoundedWindow
>   ): Unit = {
>     key.write(element.getKey)
>     state.add(element.getValue)
>     numElements.add(1L)
>     if (numElements.read() == 1) {
>       flushTimer
>         .offset(Duration.standardSeconds(10))
>         .setRelative()
>     }
>   }
>   @OnTimer(FlushTimerId)
>   def onFlushTimer(
>     @StateId(KeyId) key: ValueState[K],
>     @StateId(AggregationId) state: CombiningState[V, _, Out],
>     @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
>     output: OutputReceiver[KV[K, Out]]
>   ): Unit = {
>     if (numElements.read() > 0) {
>       val k = key.read()
>       output.output(
>         KV.of(k, state.read())
>       )
>     }
>     numElements.clear()
>   }
> }{code}
> [2]
>  [https://imgur.com/a/xvPR5nd]
> [3]
>  [https://imgur.com/a/jznMdaQ]
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to