+Brian Hulette <bhule...@google.com> I believe you worked on a way to load
data on worker startup?

On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com> wrote:

> The getState function should be static, sorry. "synchronized static
> @NotNull MyState getState()"
>
> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com>
> wrote:
>
>> > On every dataflow start, I want to read from CloudSQL and build the
>> cache
>>
>> If you do this outside of dataflow, you can use a static to do this on
>> every worker start. Is that what you're looking for? For example:
>>
>> final class StateLoader {
>>   private StateLoader() {}
>>
>>   @GuardedBy("this")
>>   private static @Nullable MyState state;
>>
>>   synchronized @NotNull MyState getState() {
>>     if (state == null) {
>>       state = LoadStateFromSQL();
>>     }
>>     return state;
>>   }
>> }
>>
>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>> hsuta...@paloaltonetworks.com> wrote:
>>
>>> Hi,
>>>
>>> I have one question. This is *kind of a blocker for our upcoming
>>> release*. It would be great if you could reply at your earliest
>>> convenience.
>>>
>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>> stateful transformation. On every dataflow start, I want to read from
>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>> cache as side-input to my current transform. But, it looks like there is
>>> some issue when I add side input to my stateful transform. I think I am
>>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
>>> Is there any workaround? Any help would be appreciated.
>>>
>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>> am using GlobalWindow.
>>>
>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, 
>>> DataTunnelStatus>, DataTunnelStateRelational> {
>>>     @TimerId("tunnelStatusExpiryTimer")
>>>     private final TimerSpec tunnelStatusExpiryTimer = 
>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>     @StateId("tunnelStatus")
>>>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache 
>>> =
>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>
>>> @ProcessElement
>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> 
>>> input,
>>>                     MultiOutputReceiver out,
>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> 
>>> tunnelStatusCache,
>>>                     @TimerId("tunnelStatusExpiryTimer") Timer 
>>> tunnelStatusExpiryTimer,
>>>                     ProcessContext c)
>>>
>>>
>>>
>>> Thanks,
>>> Hemali Sutaria
>>>
>>>

Reply via email to