Another workaround might be to create a PCollection that is the tagged union of the main input and the side input. I think you can avoid per-element overhead of checking which input they are from by setting some sort of timer or threshold where you switch a hardcoded lambda to the "main input only" path.
Kenn On Tue, Feb 23, 2021 at 5:07 PM Ahmet Altay <al...@google.com> wrote: > Hemali, would this be a reasonable workaround for your problem? > > /cc +Kenneth Knowles <k...@google.com> - In case there is an alternative > workaround to BEAM-6855. > /cc +Cosmin Arad <ca...@google.com> > > On Thu, Feb 18, 2021 at 1:27 PM Brian Hulette <bhule...@google.com> wrote: > >> I added JvmInitializer [1] to do some one-time initialization per JVM >> before processing starts. It might be useful here... the intended use-case >> was to perform quick configuration functions, but I suppose you could use >> it to pull some data that you can reference later. >> >> [1] >> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/harness/JvmInitializer.html >> >> On Thu, Feb 18, 2021 at 1:03 PM Pablo Estrada <pabl...@google.com> wrote: >> >>> +Brian Hulette <bhule...@google.com> I believe you worked on a way to >>> load data on worker startup? >>> >>> On Thu, Feb 18, 2021 at 1:00 PM Daniel Collins <dpcoll...@google.com> >>> wrote: >>> >>>> The getState function should be static, sorry. "synchronized static >>>> @NotNull MyState getState()" >>>> >>>> On Thu, Feb 18, 2021 at 3:41 PM Daniel Collins <dpcoll...@google.com> >>>> wrote: >>>> >>>>> > On every dataflow start, I want to read from CloudSQL and build the >>>>> cache >>>>> >>>>> If you do this outside of dataflow, you can use a static to do this on >>>>> every worker start. Is that what you're looking for? For example: >>>>> >>>>> final class StateLoader { >>>>> private StateLoader() {} >>>>> >>>>> @GuardedBy("this") >>>>> private static @Nullable MyState state; >>>>> >>>>> synchronized @NotNull MyState getState() { >>>>> if (state == null) { >>>>> state = LoadStateFromSQL(); >>>>> } >>>>> return state; >>>>> } >>>>> } >>>>> >>>>> On Thu, Feb 18, 2021 at 2:50 PM Hemali Sutaria < >>>>> hsuta...@paloaltonetworks.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have one question. This is *kind of a blocker for our upcoming >>>>>> release*. It would be great if you could reply at your earliest >>>>>> convenience. >>>>>> >>>>>> My dataflow pipeline is stateful. I am using Beam SDK for stateful >>>>>> processing (StateId, ValueState). I have also implemented OnTimer for my >>>>>> stateful transformation. On every dataflow start, I want to read from >>>>>> CloudSQL and build the cache. For that, I need to provide the pre-built >>>>>> cache as side-input to my current transform. But, it looks like there is >>>>>> some issue when I add side input to my stateful transform. I think I am >>>>>> hitting BEAM-6855 issue ( >>>>>> https://issues.apache.org/jira/browse/BEAM-6855). Is there any >>>>>> workaround? Any help would be appreciated. >>>>>> >>>>>> Following is my definition of Transforms. I am using 2.23.0 beam SDK. >>>>>> I am using GlobalWindow. >>>>>> >>>>>> private class GetLatestState extends DoFn<KV<DataTunnelStatusKey, >>>>>> DataTunnelStatus>, DataTunnelStateRelational> { >>>>>> @TimerId("tunnelStatusExpiryTimer") >>>>>> private final TimerSpec tunnelStatusExpiryTimer = >>>>>> TimerSpecs.timer(TimeDomain.EVENT_TIME); >>>>>> >>>>>> @StateId("tunnelStatus") >>>>>> private final StateSpec<ValueState<DataTunnelStatus>> >>>>>> tunnelStatusCache = >>>>>> StateSpecs.value(AvroCoder.of(DataTunnelStatus.class)); >>>>>> >>>>>> @ProcessElement >>>>>> public void process(@Element KV<DataTunnelStatusKey, DataTunnelStatus> >>>>>> input, >>>>>> MultiOutputReceiver out, >>>>>> @StateId("tunnelStatus") >>>>>> ValueState<DataTunnelStatus> tunnelStatusCache, >>>>>> @TimerId("tunnelStatusExpiryTimer") Timer >>>>>> tunnelStatusExpiryTimer, >>>>>> ProcessContext c) >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Hemali Sutaria >>>>>> >>>>>>