Hemali, would this be a reasonable workaround for your problem? /cc +Kenneth Knowles <[email protected]> - In case there is an alternative workaround to BEAM-6855. /cc +Cosmin Arad <[email protected]>
On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <[email protected]> wrote: > I added JvmInitializer [1] to do some one-time initialization per JVM > before processing starts. It might be useful here... the intended use-case > was to perform quick configuration functions, but I suppose you could use > it to pull some data that you can reference later. > > [1] > https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html > > On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <[email protected]> wrote: > >> +Brian Hulette <[email protected]> I believe you worked on a way to >> load data on worker startup? >> >> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <[email protected]> >> wrote: >> >>> The getState function should be static, sorry. "synchronized static >>> @NotNull MyState getState()" >>> >>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <[email protected]> >>> wrote: >>> >>>> > On every dataflow start, I want to read from CloudSQL and build the >>>> cache >>>> >>>> If you do this outside of dataflow, you can use a static to do this on >>>> every worker start. Is that what you're looking for? For example: >>>> >>>> final class StateLoader { >>>> private StateLoader() {} >>>> >>>> @GuardedBy("this") >>>> private static @Nullable MyState state; >>>> >>>> synchronized @NotNull MyState getState() { >>>> if (state == null) { >>>> state = LoadStateFromSQL(); >>>> } >>>> return state; >>>> } >>>> } >>>> >>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have one question. This is *kind of a blocker for our upcoming >>>>> release*. It would be great if you could reply at your earliest >>>>> convenience. >>>>> >>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful >>>>> processing (StateId, ValueState). I have also implemented OnTimer for my >>>>> stateful transformation. On every dataflow start, I want to read from >>>>> CloudSQL and build the cache. For that, I need to provide the pre-built >>>>> cache as side-input to my current transform. But, it looks like there is >>>>> some issue when I add side input to my stateful transform. I think I am >>>>> hitting BEAM-6855 issue ( >>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any >>>>> workaround? Any help would be appreciated. >>>>> >>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. >>>>> I am using GlobalWindow. >>>>> >>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >>>>> DataTunnelStatus>, DataTunnelStateRelational> { >>>>> @TimerId("tunnelStatusExpiryTimer") >>>>> private final TimerSpec tunnelStatusExpiryTimer = >>>>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>>>> >>>>> @StateId("tunnelStatus") >>>>> private final StateSpec<ValueState<DataTunnelStatus>> >>>>> tunnelStatusCache = >>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >>>>> >>>>> @ProcessElement >>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> >>>>> input, >>>>> MultiOutputReceiver out, >>>>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> >>>>> tunnelStatusCache, >>>>> @TimerId("tunnelStatusExpiryTimer") Timer >>>>> tunnelStatusExpiryTimer, >>>>> ProcessContext c) >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> Hemali Sutaria >>>>> >>>>>
