> On every dataflow start, I want to read from CloudSQL and build the cache

If you do this outside of dataflow, you can use a static to do this on
every worker start. Is that what you're looking for? For example:

final class StateLoader {
  private StateLoader() {}

  @GuardedBy("this")
  private static @Nullable MyState state;

  synchronized @NotNull MyState getState() {
    if (state == null) {
      state = LoadStateFromSQL();
    }
    return state;
  }
}

On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria <
hsuta...@paloaltonetworks.com> wrote:

> Hi,
>
> I have one question. This is *kind of a blocker for our upcoming release*.
> It would be great if you could reply at your earliest convenience.
>
> My dataflow pipeline is stateful. I am using Beam SDK for stateful
> processing (StateId, ValueState). I have also implemented OnTimer for my
> stateful transformation. On every dataflow start, I want to read from
> CloudSQL and build the cache. For that, I need to provide the pre-built
> cache as side-input to my current transform. But, it looks like there is
> some issue when I add side input to my stateful transform. I think I am
> hitting BEAM-6855 issue (https://issues.apache.org/jira/browse/BEAM-6855).
> Is there any workaround? Any help would be appreciated.
>
> Following is my definition of Transforms. I am using 2.23.0 beam SDK. I am
> using GlobalWindow.
>
> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, 
> DataTunnelStatus>, DataTunnelStateRelational> {
>     @TimerId("tunnelStatusExpiryTimer")
>     private final TimerSpec tunnelStatusExpiryTimer = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>     @StateId("tunnelStatus")
>     private final StateSpec<ValueState<DataTunnelStatus>> tunnelStatusCache =
>             StateSpecs.value(AvroCoder.of(DataTunnelStatus.class));
>
> @ProcessElement
> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> input,
>                     MultiOutputReceiver out,
>                     @StateId("tunnelStatus") ValueState<DataTunnelStatus> 
> tunnelStatusCache,
>                     @TimerId("tunnelStatusExpiryTimer") Timer 
> tunnelStatusExpiryTimer,
>                     ProcessContext c)
>
>
>
> Thanks,
> Hemali Sutaria
>
>

Reply via email to