Hi Luke, I think that’s likely my mistake. I had forgotten that was tied to a given key-window. In this example use case, all of the data is keyed differently (and thus not associated to a window or a key), so knowing that I’m quite sure it’s the issue.
In this scenario I’m experimenting with an external service enrichment pattern and was hoping to buffer n records before eventually issuing a request to the data store for the appropriate keys (I.e. every n records, issue a request for all the keys associated with those, enrich and emit). Just trying to strike the balance between throughput and not issuing more requests than necessary to the source. Any recommendations? Perhaps just a global window would address this? I’ll be happy to answer any questions I can on the implementation / data side if you have any. Thanks, Rion > On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote: > > > Simple question, you are expecting to see prior results under the same window > and key which you are not seeing (since state is per key and window)? > >> On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> wrote: >> Hi all, >> >> I've been toying around with stateful DoFns recently and was attempting some >> approaches involving buffering when I realized that it seemed that my >> existing state was being ignored in the following DoFn: >> >> ``` >> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, >> ExampleRecord>>() { >> @StateId("count") >> private val count: StateSpec<ValueState<Long>> = >> StateSpecs.value(VarLongCoder.of()) >> >> @ProcessElement >> fun processElement(@StateId("count") countState: ValueState<Long>) { >> var recordsBuffered = countState.read() ?: 0 >> recordsBuffered++ >> >> // Update the state >> countState.write(recordsBuffered) >> >> // Do something later >> } >> } >> ``` >> >> It seems that every time my DoFn gets hit, the value of my state is always >> null. Does anything look particularly off with the above definition? I know >> in the past I've encountered some issues related to Kotlin and typing, >> however I tried to be as explicit as possible with all of the type >> declarations and coders. >> >> There aren't any errors, it just seems like the state.write() operation >> doesn't seem to acknowledge the updated value or persist it outside of the >> context of the processElement operation. This is just reading from a set of >> data using the LocalRunner on my own machine, so there shouldn't be anything >> specific to any given runner. >> >> Any ideas would be welcome! >> >> Thanks, >> >> Rion
