Hi Luke, This seems like a _much_ better solution than attempting to manage the state / buffer internally within the DoFn. I’ll give this a shot.
Much appreciated! Rion > On Jun 12, 2020, at 5:59 PM, Luke Cwik <[email protected]> wrote: > > > Check out GroupIntoBatches. > >> On Fri, Jun 12, 2020 at 3:53 PM Rion Williams <[email protected]> wrote: >> Hi Luke, >> >> I think that’s likely my mistake. I had forgotten that was tied to a given >> key-window. In this example use case, all of the data is keyed differently >> (and thus not associated to a window or a key), so knowing that I’m quite >> sure it’s the issue. >> >> In this scenario I’m experimenting with an external service enrichment >> pattern and was hoping to buffer n records before eventually issuing a >> request to the data store for the appropriate keys (I.e. every n records, >> issue a request for all the keys associated with those, enrich and emit). >> >> Just trying to strike the balance between throughput and not issuing more >> requests than necessary to the source. Any recommendations? Perhaps just a >> global window would address this? I’ll be happy to answer any questions I >> can on the implementation / data side if you have any. >> >> Thanks, >> >> Rion >> >>>> On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote: >>>> >>> >>> Simple question, you are expecting to see prior results under the same >>> window and key which you are not seeing (since state is per key and window)? >>> >>>> On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> >>>> wrote: >>>> Hi all, >>>> >>>> I've been toying around with stateful DoFns recently and was attempting >>>> some approaches involving buffering when I realized that it seemed that my >>>> existing state was being ignored in the following DoFn: >>>> >>>> ``` >>>> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, >>>> ExampleRecord>>() { >>>> @StateId("count") >>>> private val count: StateSpec<ValueState<Long>> = >>>> StateSpecs.value(VarLongCoder.of()) >>>> >>>> @ProcessElement >>>> fun processElement(@StateId("count") countState: ValueState<Long>) { >>>> var recordsBuffered = countState.read() ?: 0 >>>> recordsBuffered++ >>>> >>>> // Update the state >>>> countState.write(recordsBuffered) >>>> >>>> // Do something later >>>> } >>>> } >>>> ``` >>>> >>>> It seems that every time my DoFn gets hit, the value of my state is always >>>> null. Does anything look particularly off with the above definition? I >>>> know in the past I've encountered some issues related to Kotlin and >>>> typing, however I tried to be as explicit as possible with all of the type >>>> declarations and coders. >>>> >>>> There aren't any errors, it just seems like the state.write() operation >>>> doesn't seem to acknowledge the updated value or persist it outside of the >>>> context of the processElement operation. This is just reading from a set >>>> of data using the LocalRunner on my own machine, so there shouldn't be >>>> anything specific to any given runner. >>>> >>>> Any ideas would be welcome! >>>> >>>> Thanks, >>>> >>>> Rion
