Hemali, would this be a reasonable workaround for your problem?

/cc +Kenneth Knowles <k...@google.com> - In case there is an alternative
workaround to BEAM-6855.
/cc +Cosmin Arad <ca...@google.com>

On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bhule...@google.com> wrote:

> I added JvmInitializer [1] to do some one-time initialization per JVM
> before processing starts. It might be useful here... the intended use-case
> was to perform quick configuration functions, but I suppose you could use
> it to pull some data that you can reference later.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html
>
> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pabl...@google.com> wrote:
>
>> +Brian Hulette <bhule...@google.com> I believe you worked on a way to
>> load data on worker startup?
>>
>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com>
>> wrote:
>>
>>> The getState function should be static, sorry. "synchronized static
>>> @NotNull MyState getState()"
>>>
>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com>
>>> wrote:
>>>
>>>> > On every dataflow start, I want to read from CloudSQL and build the
>>>> cache
>>>>
>>>> If you do this outside of dataflow, you can use a static to do this on
>>>> every worker start. Is that what you're looking for? For example:
>>>>
>>>> final class StateLoader {
>>>>   private StateLoader() {}
>>>>
>>>>   @GuardedBy("this")
>>>>   private static @Nullable MyState state;
>>>>
>>>>   synchronized @NotNull MyState getState() {
>>>>     if (state == null) {
>>>>       state = LoadStateFromSQL();
>>>>     }
>>>>     return state;
>>>>   }
>>>> }
>>>>
>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
>>>> hsuta...@paloaltonetworks.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have one question. This is *kind of a blocker for our upcoming
>>>>> release*. It would be great if you could reply at your earliest
>>>>> convenience.
>>>>>
>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful
>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my
>>>>> stateful transformation. On every dataflow start, I want to read from
>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built
>>>>> cache as side-input to my current transform. But, it looks like there is
>>>>> some issue when I add side input to my stateful transform. I think I am
>>>>> hitting BEAM-6855 issue (
>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any
>>>>> workaround? Any help would be appreciated.
>>>>>
>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK.
>>>>> I am using GlobalWindow.
>>>>>
>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, 
>>>>> DataTunnelStatus>, DataTunnelStateRelational> {
>>>>>     @TimerId("tunnelStatusExpiryTimer")
>>>>>     private final TimerSpec tunnelStatusExpiryTimer = 
>>>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>>>
>>>>>     @StateId("tunnelStatus")
>>>>>     private final StateSpec<ValueState<DataTunnelStatus>> 
>>>>> tunnelStatusCache =
>>>>>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>>>>>
>>>>> @ProcessElement
>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> 
>>>>> input,
>>>>>                     MultiOutputReceiver out,
>>>>>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> 
>>>>> tunnelStatusCache,
>>>>>                     @TimerId("tunnelStatusExpiryTimer") Timer 
>>>>> tunnelStatusExpiryTimer,
>>>>>                     ProcessContext c)
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Hemali Sutaria
>>>>>
>>>>>

Reply via email to