Check out GroupIntoBatches. On Fri, Jun 12, 2020 at 3:53 PM Rion Williams <[email protected]> wrote:
> Hi Luke, > > I think that’s likely my mistake. I had forgotten that was tied to a given > key-window. In this example use case, all of the data is keyed differently > (and thus not associated to a window or a key), so knowing that I’m quite > sure it’s the issue. > > In this scenario I’m experimenting with an external service enrichment > pattern and was hoping to buffer n records before eventually issuing a > request to the data store for the appropriate keys (I.e. every n records, > issue a request for all the keys associated with those, enrich and emit). > > Just trying to strike the balance between throughput and not issuing more > requests than necessary to the source. Any recommendations? Perhaps just a > global window would address this? I’ll be happy to answer any questions I > can on the implementation / data side if you have any. > > Thanks, > > Rion > > On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote: > > > Simple question, you are expecting to see prior results under the same > window and key which you are not seeing (since state is per key and window)? > > On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> > wrote: > >> Hi all, >> >> I've been toying around with stateful DoFns recently and was attempting >> some approaches involving buffering when I realized that it seemed that my >> existing state was being ignored in the following DoFn: >> >> ``` >> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, >> ExampleRecord>>() { >> @StateId("count") >> private val count: StateSpec<ValueState<Long>> = >> StateSpecs.value(VarLongCoder.of()) >> >> @ProcessElement >> fun processElement(@StateId("count") countState: ValueState<Long>) { >> var recordsBuffered = countState.read() ?: 0 >> recordsBuffered++ >> >> // Update the state >> countState.write(recordsBuffered) >> >> // Do something later >> } >> } >> ``` >> >> It seems that every time my DoFn gets hit, the value of my state is >> always null. Does anything look particularly off with the above definition? >> I know in the past I've encountered some issues related to Kotlin and >> typing, however I tried to be as explicit as possible with all of the type >> declarations and coders. >> >> There aren't any errors, it just seems like the state.write() operation >> doesn't seem to acknowledge the updated value or persist it outside of the >> context of the processElement operation. This is just reading from a set of >> data using the LocalRunner on my own machine, so there shouldn't be >> anything specific to any given runner. >> >> Any ideas would be welcome! >> >> Thanks, >> >> Rion >> >
