Hemali, would this be a reasonable workaround for your problem? /cc +Kenneth Knowles <k...@google.com> - In case there is an alternative workaround to BEAM-6855. /cc +Cosmin Arad <ca...@google.com>
On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bhule...@google.com> wrote: > I added JvmInitializer [1] to do some one-time initialization per JVM > before processing starts. It might be useful here... the intended use-case > was to perform quick configuration functions, but I suppose you could use > it to pull some data that you can reference later. > > [1] > https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html > > On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pabl...@google.com> wrote: > >> +Brian Hulette <bhule...@google.com> I believe you worked on a way to >> load data on worker startup? >> >> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com> >> wrote: >> >>> The getState function should be static, sorry. "synchronized static >>> @NotNull MyState getState()" >>> >>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> >>> wrote: >>> >>>> > On every dataflow start, I want to read from CloudSQL and build the >>>> cache >>>> >>>> If you do this outside of dataflow, you can use a static to do this on >>>> every worker start. Is that what you're looking for? For example: >>>> >>>> final class StateLoader { >>>> private StateLoader() {} >>>> >>>> @GuardedBy("this") >>>> private static @Nullable MyState state; >>>> >>>> synchronized @NotNull MyState getState() { >>>> if (state == null) { >>>> state = LoadStateFromSQL(); >>>> } >>>> return state; >>>> } >>>> } >>>> >>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < >>>> hsuta...@paloaltonetworks.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have one question. This is *kind of a blocker for our upcoming >>>>> release*. It would be great if you could reply at your earliest >>>>> convenience. >>>>> >>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful >>>>> processing (StateId, ValueState). I have also implemented OnTimer for my >>>>> stateful transformation. On every dataflow start, I want to read from >>>>> CloudSQL and build the cache. For that, I need to provide the pre-built >>>>> cache as side-input to my current transform. But, it looks like there is >>>>> some issue when I add side input to my stateful transform. I think I am >>>>> hitting BEAM-6855 issue ( >>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any >>>>> workaround? Any help would be appreciated. >>>>> >>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. >>>>> I am using GlobalWindow. >>>>> >>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >>>>> DataTunnelStatus>, DataTunnelStateRelational> { >>>>> @TimerId("tunnelStatusExpiryTimer") >>>>> private final TimerSpec tunnelStatusExpiryTimer = >>>>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>>>> >>>>> @StateId("tunnelStatus") >>>>> private final StateSpec<ValueState<DataTunnelStatus>> >>>>> tunnelStatusCache = >>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >>>>> >>>>> @ProcessElement >>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> >>>>> input, >>>>> MultiOutputReceiver out, >>>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> >>>>> tunnelStatusCache, >>>>> @TimerId("tunnelStatusExpiryTimer") Timer >>>>> tunnelStatusExpiryTimer, >>>>> ProcessContext c) >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> Hemali Sutaria >>>>> >>>>>