+Brian Hulette <bhule...@google.com> I believe you worked on a way to load data on worker startup?
On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com> wrote: > The getState function should be static, sorry. "synchronized static > @NotNull MyState getState()" > > On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> > wrote: > >> > On every dataflow start, I want to read from CloudSQL and build the >> cache >> >> If you do this outside of dataflow, you can use a static to do this on >> every worker start. Is that what you're looking for? For example: >> >> final class StateLoader { >> private StateLoader() {} >> >> @GuardedBy("this") >> private static @Nullable MyState state; >> >> synchronized @NotNull MyState getState() { >> if (state == null) { >> state = LoadStateFromSQL(); >> } >> return state; >> } >> } >> >> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < >> hsuta...@paloaltonetworks.com> wrote: >> >>> Hi, >>> >>> I have one question. This is *kind of a blocker for our upcoming >>> release*. It would be great if you could reply at your earliest >>> convenience. >>> >>> My dataflow pipeline is stateful. I am using Beam SDK for stateful >>> processing (StateId, ValueState). I have also implemented OnTimer for my >>> stateful transformation. On every dataflow start, I want to read from >>> CloudSQL and build the cache. For that, I need to provide the pre-built >>> cache as side-input to my current transform. But, it looks like there is >>> some issue when I add side input to my stateful transform. I think I am >>> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855). >>> Is there any workaround? Any help would be appreciated. >>> >>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I >>> am using GlobalWindow. >>> >>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >>> DataTunnelStatus>, DataTunnelStateRelational> { >>> @TimerId("tunnelStatusExpiryTimer") >>> private final TimerSpec tunnelStatusExpiryTimer = >>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>> >>> @StateId("tunnelStatus") >>> private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache >>> = >>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >>> >>> @ProcessElement >>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> >>> input, >>> MultiOutputReceiver out, >>> @StateId("tunnelStatus") ValueState<DataTunnelStatus> >>> tunnelStatusCache, >>> @TimerId("tunnelStatusExpiryTimer") Timer >>> tunnelStatusExpiryTimer, >>> ProcessContext c) >>> >>> >>> >>> Thanks, >>> Hemali Sutaria >>> >>>