I added JvmInitializer [1] to do some one-time initialization per JVM before processing starts. It might be useful here... the intended use-case was to perform quick configuration functions, but I suppose you could use it to pull some data that you can reference later.
[1] https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pabl...@google.com> wrote: > +Brian Hulette <bhule...@google.com> I believe you worked on a way to > load data on worker startup? > > On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com> > wrote: > >> The getState function should be static, sorry. "synchronized static >> @NotNull MyState getState()" >> >> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> >> wrote: >> >>> > On every dataflow start, I want to read from CloudSQL and build the >>> cache >>> >>> If you do this outside of dataflow, you can use a static to do this on >>> every worker start. Is that what you're looking for? For example: >>> >>> final class StateLoader { >>> private StateLoader() {} >>> >>> @GuardedBy("this") >>> private static @Nullable MyState state; >>> >>> synchronized @NotNull MyState getState() { >>> if (state == null) { >>> state = LoadStateFromSQL(); >>> } >>> return state; >>> } >>> } >>> >>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < >>> hsuta...@paloaltonetworks.com> wrote: >>> >>>> Hi, >>>> >>>> I have one question. This is *kind of a blocker for our upcoming >>>> release*. It would be great if you could reply at your earliest >>>> convenience. >>>> >>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful >>>> processing (StateId, ValueState). I have also implemented OnTimer for my >>>> stateful transformation. On every dataflow start, I want to read from >>>> CloudSQL and build the cache. For that, I need to provide the pre-built >>>> cache as side-input to my current transform. But, it looks like there is >>>> some issue when I add side input to my stateful transform. I think I am >>>> hitting BEAM-6855 issue ( >>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any >>>> workaround? Any help would be appreciated. >>>> >>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I >>>> am using GlobalWindow. >>>> >>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >>>> DataTunnelStatus>, DataTunnelStateRelational> { >>>> @TimerId("tunnelStatusExpiryTimer") >>>> private final TimerSpec tunnelStatusExpiryTimer = >>>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>>> >>>> @StateId("tunnelStatus") >>>> private final StateSpec<ValueState<DataTunnelStatus>> >>>> tunnelStatusCache = >>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >>>> >>>> @ProcessElement >>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> >>>> input, >>>> MultiOutputReceiver out, >>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> >>>> tunnelStatusCache, >>>> @TimerId("tunnelStatusExpiryTimer") Timer >>>> tunnelStatusExpiryTimer, >>>> ProcessContext c) >>>> >>>> >>>> >>>> Thanks, >>>> Hemali Sutaria >>>> >>>>