Hi Luke,

I think that’s likely my mistake. I had forgotten that was tied to a given 
key-window. In this example use case, all of the data is keyed differently (and 
thus not associated to a window or a key), so knowing that I’m quite sure it’s 
the issue.

In this scenario I’m experimenting with an external service enrichment pattern 
and was hoping to buffer n records before eventually issuing a request to the 
data store for the appropriate keys (I.e. every n records, issue a request for 
all the keys associated with those, enrich and emit).

Just trying to strike the balance between throughput and not issuing more 
requests than necessary to the source. Any recommendations? Perhaps just a 
global window would address this? I’ll be happy to answer any questions I can 
on the implementation / data side if you have any.

Thanks,

Rion

> On Jun 12, 2020, at 5:22 PM, Luke Cwik <[email protected]> wrote:
> 
> 
> Simple question, you are expecting to see prior results under the same window 
> and key which you are not seeing (since state is per key and window)?
> 
>> On Fri, Jun 12, 2020 at 3:09 PM Rion Williams <[email protected]> wrote:
>> Hi all,
>> 
>> I've been toying around with stateful DoFns recently and was attempting some 
>> approaches involving buffering when I realized that it seemed that my 
>> existing state was being ignored in the following DoFn:
>> 
>> ```
>> class ExampleStatefulDoFn(): DoFn<KV<String, ExampleRecord>, KV<String, 
>> ExampleRecord>>() {
>>     @StateId("count")
>>     private val count: StateSpec<ValueState<Long>> = 
>> StateSpecs.value(VarLongCoder.of())
>> 
>>     @ProcessElement
>>     fun processElement(@StateId("count") countState: ValueState<Long>) {
>>         var recordsBuffered = countState.read() ?: 0
>>         recordsBuffered++
>> 
>>         // Update the state
>>         countState.write(recordsBuffered)
>> 
>>         // Do something later
>>     }
>> }
>> ```
>> 
>> It seems that every time my DoFn gets hit, the value of my state is always 
>> null. Does anything look particularly off with the above definition? I know 
>> in the past I've encountered some issues related to Kotlin and typing, 
>> however I tried to be as explicit as possible with all of the type 
>> declarations and coders.
>> 
>> There aren't any errors, it just seems like the state.write() operation 
>> doesn't seem to acknowledge the updated value or persist it outside of the 
>> context of the processElement operation. This is just reading from a set of 
>> data using the LocalRunner on my own machine, so there shouldn't be anything 
>> specific to any given runner.
>> 
>> Any ideas would be welcome! 
>> 
>> Thanks,
>> 
>> Rion

Reply via email to