Hi all,
I've been toying around with stateful DoFns recently and was attempting some
approaches involving buffering when I realized that it seemed that my existing
state was being ignored in the following DoFn:
```
class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String,
ExampleRecord>>() {
@StateId("count")
private val count: StateSpec<ValueState<Long>> =
StateSpecs.value(VarLongCoder.of())
@ProcessElement
fun processElement(@StateId("count") countState: ValueState<Long>) {
var recordsBuffered = countState.read() ?: 0
recordsBuffered++
// Update the state
countState.write(recordsBuffered)
// Do something later
}
}
```
It seems that every time my DoFn gets hit, the value of my state is always
null. Does anything look particularly off with the above definition? I know in
the past I've encountered some issues related to Kotlin and typing, however I
tried to be as explicit as possible with all of the type declarations and
coders.
There aren't any errors, it just seems like the state.write() operation doesn't
seem to acknowledge the updated value or persist it outside of the context of
the processElement operation. This is just reading from a set of data using the
LocalRunner on my own machine, so there shouldn't be anything specific to any
given runner.
Any ideas would be welcome!
Thanks,
Rion