Hi Luke, 

This seems like a _much_ better solution than attempting to manage the state / 
buffer internally within the DoFn.  I’ll give this a shot.

Much appreciated!

Rion



> On Jun 12, 2020, at 5:59 PM, Luke Cwik <[email protected]> wrote:
> 
> 
> Check out GroupIntoBatches.
> 
>> On Fri, Jun 12, 2020 at 3:53 PM Rion Williams <[email protected]> wrote:
>> Hi Luke,
>> 
>> I think that’s likely my mistake. I had forgotten that was tied to a given 
>> key-window. In this example use case, all of the data is keyed differently 
>> (and thus not associated to a window or a key), so knowing that I’m quite 
>> sure it’s the issue.
>> 
>> In this scenario I’m experimenting with an external service enrichment 
>> pattern and was hoping to buffer n records before eventually issuing a 
>> request to the data store for the appropriate keys (I.e. every n records, 
>> issue a request for all the keys associated with those, enrich and emit).
>> 
>> Just trying to strike the balance between throughput and not issuing more 
>> requests than necessary to the source. Any recommendations? Perhaps just a 
>> global window would address this? I’ll be happy to answer any questions I 
>> can on the implementation / data side if you have any.
>> 
>> Thanks,
>> 
>> Rion
>> 
>>>> On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote:
>>>> 
>>> 
>>> Simple question, you are expecting to see prior results under the same 
>>> window and key which you are not seeing (since state is per key and window)?
>>> 
>>>> On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> 
>>>> wrote:
>>>> Hi all,
>>>> 
>>>> I've been toying around with stateful DoFns recently and was attempting 
>>>> some approaches involving buffering when I realized that it seemed that my 
>>>> existing state was being ignored in the following DoFn:
>>>> 
>>>> ```
>>>> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, 
>>>> ExampleRecord>>() {
>>>>     @StateId("count")
>>>>     private val count: StateSpec<ValueState<Long>> = 
>>>> StateSpecs.value(VarLongCoder.of())
>>>> 
>>>>     @ProcessElement
>>>>     fun processElement(@StateId("count") countState: ValueState<Long>) {
>>>>         var recordsBuffered = countState.read() ?: 0
>>>>         recordsBuffered++
>>>> 
>>>>         // Update the state
>>>>         countState.write(recordsBuffered)
>>>> 
>>>>         // Do something later
>>>>     }
>>>> }
>>>> ```
>>>> 
>>>> It seems that every time my DoFn gets hit, the value of my state is always 
>>>> null. Does anything look particularly off with the above definition? I 
>>>> know in the past I've encountered some issues related to Kotlin and 
>>>> typing, however I tried to be as explicit as possible with all of the type 
>>>> declarations and coders.
>>>> 
>>>> There aren't any errors, it just seems like the state.write() operation 
>>>> doesn't seem to acknowledge the updated value or persist it outside of the 
>>>> context of the processElement operation. This is just reading from a set 
>>>> of data using the LocalRunner on my own machine, so there shouldn't be 
>>>> anything specific to any given runner.
>>>> 
>>>> Any ideas would be welcome! 
>>>> 
>>>> Thanks,
>>>> 
>>>> Rion

Reply via email to