Hi Rajath,

I think you need to guarantee that the PCollection will have a single
element, so you can use .asSingleton().

A possible approach is getting the latest for the window prior to
.asSingleton():

PCollectionView<Map<String, String>> targetCountryByJourneyId =
p.apply("Generate a sequence for extracting resource_id->targetCountry
map", GenerateSequence.from(0).withRate(1,

Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
        .apply("Get all rules from Redis for resourceId->targetCountry
map", ParDo.of(new
GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
options.getRedisHost().get(), statsdHost, statsdPort)))
        .apply("Transform all Rules into journeyId->targetCountry map",
ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
Rule.TARGET_COUNTRY)))
        .apply("Window journeyId->targetCountry map", Window.<Map<String,
String>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())

*        .apply("Combine to Latest", Latest.globally())*
.apply("singleton view of journeyId -> targetCountry map",
View.asSingleton());


Best,
Bruno

On Fri, Oct 20, 2023 at 9:05 AM Rajath BK <rajath.u...@gmail.com> wrote:

>
> Hi ,
>    We are using a few side Inputs access data that would be used in
> filtering out elements in the pipelines. The source of the side input data
> is redis instance in Google cloud memory store.
> The jobs are deployed using the generated pipeline templates via the
> Dataflow service APIs.
>
> The side inputs are generated as below
> PCollectionView<Map<String, String>> targetCountryByJourneyId =
> p.apply("Generate a sequence for extracting resource_id->targetCountry
> map", GenerateSequence.from(0).withRate(1,
>
> Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
>         .apply("Get all rules from Redis for resourceId->targetCountry
> map", ParDo.of(new
> GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
> options.getRedisHost().get(), statsdHost, statsdPort)))
>         .apply("Transform all Rules into journeyId->targetCountry map",
> ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
> Rule.TARGET_COUNTRY)))
>         .apply("Window journeyId->targetCountry map", Window.<Map<String,
> String>>into(new GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                 .discardingFiredPanes())
>         .apply("singleton view of journeyId -> targetCountry map",
> View.asSingleton());
>
> and accessed as below
>
> PCollection<UserData> qualifiedUser = userValidatedEntry.
> apply("Eliminate user by target country",
>         ParDo.of(new EliminateUserByTargetCountry(
>                 options.getProjectNameForBT().get(),
>                 options.getInstanceNameForBT().get(),
>                 bigTableAppProfileId,
>                 targetCountryByJourneyId,
>                 statsdHost,
>                 statsdPort)
>         )
>                 .withSideInputs(targetCountryByJourneyId));
>
> The pipelines are being run on Google cloud Dataflow and
> We have been noticing 2 kinds of issues
> 1. Despite closing the window on the side input collection when the first
> element occurs, we are seeing this error. Strange thing is we don't see
> these errors occurring across all the workers in the job and we have also
> noticed this issue to get resolved during new deployments and only to
> resurface later.
>
> java.lang.IllegalArgumentException: PCollection with more than one element
> accessed as a singleton view. Consider using
> Combine.globally().asSingleton() to combine the PCollection into a single
> value at
> org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434)
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524)
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493)
> at
> org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
> at
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
> at
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125)
> at
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> 2. Secondly, we have noticed the side input to not contain data even when
> the underlying data exists. This side input view being a map, when this
> occurs, we see empty keys and breaks our code while accessing it.
>
> Has anyone faced similar issues ? I would greatly help if someone can shed
> some light on this seemingly non-deterministic behavior of the side inputs
> when created as Singletons.
>
> -Thanks in advance
> Rajath
>

Reply via email to