I added JvmInitializer [1] to do some one-time initialization per JVM
before processing starts. It might be useful here... the intended use-case
was to perform quick configuration functions, but I suppose you could use
it to pull some data that you can reference later.

[1]
https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html

On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pabl...@google.com> wrote:

> +Brian Hulette <bhule...@google.com> I believe you worked on a way to
> load data on worker startup?
>
> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com>
> wrote:
>
>> The getState function should be static, sorry. "synchronized static
>> @NotNull MyState getState()"
>>
>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com>
>> wrote:
>>
>>> > On every dataflow start, I want to read from CloudSQL and build the
>>> cache
>>>
>>> If you do this outside of dataflow, you can use a static to do this on
>>> every worker start. Is that what you're looking for? For example:
>>>
>>> final class StateLoader {
>>>   private StateLoader() {}
>>>
>>>   @GuardedBy("this")
>>>   private static @Nullable MyState state;
>>>
>>>   synchronized @NotNull MyState getState() {
>>>     if (state == null) {
>>>       state = LoadStateFromSQL();
>>>     }
>>>     return state;
>>>   }
>>> }
>>>
>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>> hsuta...@paloaltonetworks.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have one question. This is *kind of a blocker for our upcoming
>>>> release*. It would be great if you could reply at your earliest
>>>> convenience.
>>>>
>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>> stateful transformation. On every dataflow start, I want to read from
>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>> cache as side-input to my current transform. But, it looks like there is
>>>> some issue when I add side input to my stateful transform. I think I am
>>>> hitting BEAM-6855 issue (
>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>> workaround? Any help would be appreciated.
>>>>
>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I
>>>> am using GlobalWindow.
>>>>
>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, 
>>>> DataTunnelStatus>, DataTunnelStateRelational> {
>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>     private final TimerSpec tunnelStatusExpiryTimer = 
>>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>
>>>>     @StateId("tunnelStatus")
>>>>     private final StateSpec<ValueState<DataTunnelStatus>> 
>>>> tunnelStatusCache =
>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>
>>>> @ProcessElement
>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> 
>>>> input,
>>>>                     MultiOutputReceiver out,
>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> 
>>>> tunnelStatusCache,
>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer 
>>>> tunnelStatusExpiryTimer,
>>>>                     ProcessContext c)
>>>>
>>>>
>>>>
>>>> Thanks,
>>>> Hemali Sutaria
>>>>
>>>>

Reply via email to