> On every dataflow start, I want to read from CloudSQL and build the cache
If you do this outside of dataflow, you can use a static to do this on every worker start. Is that what you're looking for? For example: final class StateLoader { private StateLoader() {} @GuardedBy("this") private static @Nullable MyState state; synchronized @NotNull MyState getState() { if (state == null) { state = LoadStateFromSQL(); } return state; } } On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < hsuta...@paloaltonetworks.com> wrote: > Hi, > > I have one question. This is *kind of a blocker for our upcoming release*. > It would be great if you could reply at your earliest convenience. > > My dataflow pipeline is stateful. I am using Beam SDK for stateful > processing (StateId, ValueState). I have also implemented OnTimer for my > stateful transformation. On every dataflow start, I want to read from > CloudSQL and build the cache. For that, I need to provide the pre-built > cache as side-input to my current transform. But, it looks like there is > some issue when I add side input to my stateful transform. I think I am > hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855). > Is there any workaround? Any help would be appreciated. > > Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am > using GlobalWindow. > > private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, > DataTunnelStatus>, DataTunnelStateRelational> { > @TimerId("tunnelStatusExpiryTimer") > private final TimerSpec tunnelStatusExpiryTimer = > TimerSpecs.timer(TimeDomain.EVENT_TIME); > > @StateId("tunnelStatus") > private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache = > StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); > > @ProcessElement > public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input, > MultiOutputReceiver out, > @StateId("tunnelStatus") ValueState<DataTunnelStatus> > tunnelStatusCache, > @TimerId("tunnelStatusExpiryTimer") Timer > tunnelStatusExpiryTimer, > ProcessContext c) > > > > Thanks, > Hemali Sutaria > >