Hi Reuven, Thanks for your reply. Well, I haven't filed JIRA yet. But if it looks like a bug at beam's end, I will file jira for this.
Regards Mohil On Mon, May 18, 2020 at 12:26 PM Reuven Lax <re...@google.com> wrote: > However this still appears to be a bug - that exception should never be > thrown inside the Dataflow runner. Are you able to file a JIRA for this bug? > > On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw <rober...@google.com> > wrote: > >> Glad you were able to get this working; thanks for following up. >> >> On Mon, May 18, 2020 at 10:35 AM Mohil Khare <mo...@prosimo.io> wrote: >> >>> Hi, >>> On another note, I think I was unnecessarily complicating things by >>> applying a sliding window here and then an extra global window to remove >>> duplicates. I replaced the *sliding window with a session window *(*as >>> I know that my transaction consisting of recordA logs and recordB logs for >>> a key "MyKey" won't last for more than 60-90 secs*), and my use case >>> seems to be working fine. Even DRAIN is working successfully. >>> >>> Thanks >>> Mohil >>> >>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hello, >>>> >>>> I have a use case where I have two sets of PCollections (RecordA and >>>> RecordB) coming from a real time streaming source like Kafka. >>>> >>>> Both Records are correlated with a common key, let's say KEY. >>>> >>>> The purpose is to enrich RecordA with RecordB's data for which I am >>>> using CoGbByKey. Since RecordA and RecordB for a common key can come within >>>> 1-2 minutes of event time, I am maintaining a sliding window for both >>>> records and then do CoGpByKey for both PCollections. >>>> >>>> The sliding windows that will find both RecordA and RecordB for a >>>> common key KEY, will emit enriched output. Now, since multiple sliding >>>> windows can emit the same output, I finally remove duplicate results by >>>> feeding aforementioned outputs to a global window where I maintain a state >>>> to check whether output has already been processed or not. Since it is a >>>> global window, I maintain a Timer on state (for GC) to let it expire after >>>> 10 minutes have elapsed since state has been written. >>>> >>>> This is working perfectly fine w.r.t the expected results. However, I >>>> am unable to stop job gracefully i.e. Drain the job gracefully. I see >>>> following exception: >>>> >>>> java.lang.IllegalStateException: >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 >>>> received state cleanup timer for window >>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >>>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>>> >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> java.lang.Thread.run(Thread.java:745) >>>> java.lang.IllegalStateException: >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 >>>> received state cleanup timer for window >>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is >>>> before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>>> >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> java.lang.Thread.run(Thread.java:745) >>>> java.lang.IllegalStateException: >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b >>>> received state cleanup timer for window >>>> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 >>>> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* >>>> >>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>> >>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>> >>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>> >>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> My code snippet: >>>> >>>> PCollection<KV<MyKey, RecordA>> windowedRecordA = >>>> incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", >>>> Window.<KV<MyKey, >>>> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>>> >>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>>> >>>> >>>> PCollection<KV<MyKey, RecordB>> windowedRecordB = >>>> recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, >>>> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>>> >>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>>> >>>> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = >>>> KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) >>>> .and(TagRecordB, windowedRecordB) >>>> .apply("CoGroupByKey", CoGroupByKey.create()); >>>> >>>> >>>> PCollection<RecordA> enrichedRecordA = >>>> coGbkRecords.apply("EnrichRecordAWithRecordB", >>>> new EnrichIncompleteRecordA()); >>>> >>>> >>>> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, >>>> CoGbkResult>>, PCollection<RecordA>> { >>>> @Override >>>> public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> >>>> input) { >>>> logger.info("Enriching Incomplete RecordA with RecordB"); >>>> return input >>>> .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new >>>> AddRecordBData())) >>>> .apply("Applying_Windowing", Window.<KV<MyKey, >>>> RecordA>>into(new GlobalWindows()) >>>> >>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>>> .discardingFiredPanes()) >>>> .apply("Emit_Unique_RecordA", ParDo.of(new >>>> EmitUniqueRecordA())); >>>> >>>> } >>>> >>>> private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, >>>> KV<MyKey, RecordA>> { >>>> @Setup >>>> public void setup() { >>>> } >>>> >>>> @StartBundle >>>> public void startBundle() { >>>> >>>> } >>>> >>>> @ProcessElement >>>> public void processElement(@Element KV<MyKey, CoGbkResult> input, >>>> OutputReceiver<KV<MyKey, RecordA>> out) { >>>> Iterable<RecordA> allRecordALogs = >>>> input.getValue().getAll(TagRecordA); >>>> Iterable<RecordB> allRecordBLogs = >>>> input.getValue().getAll(TagRecordB); >>>> >>>> /* >>>> There should be max 1 RecordB per MyKey >>>> */ >>>> if (allRecordALogs.iterator().hasNext() && >>>> allRecordBLogs.iterator().hasNext()) { >>>> RecordB recordB = Iterables.getFirst(allRecordBLogs, null); >>>> for (RecordA recordA : allRecordALogs) { >>>> if (null != recordB) { >>>> logger.info("Enriching incomplete recordA [{}] >>>> with recordB: [{}]", recordA, recordB); >>>> <code to populate recordA object with recordB >>>> data> >>>> >>>> out.output(KV.of(input.getKey(), recordA)); >>>> } else { >>>> logger.error("No recordB available for recordA log >>>> [{}]", recordA); >>>> } >>>> } >>>> } else { >>>> logger.info("Either recordA or recordB not present for >>>> myKey: {}", input.getKey()); >>>> } >>>> } >>>> >>>> @FinishBundle >>>> public void finishBundle() { >>>> >>>> } >>>> >>>> @Teardown >>>> public void teardown() { >>>> } >>>> } >>>> >>>> >>>> private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, >>>> RecordA> { >>>> @Setup >>>> public void setup() { >>>> } >>>> >>>> @StartBundle >>>> public void startBundle() { >>>> } >>>> >>>> @StateId("processedRecordA") >>>> private final StateSpec<ValueState<RecordA> processedRecordASpec = >>>> StateSpecs.value(AvroCoder.of(RecordA.class)); >>>> >>>> @TimerId("stateExpiry") >>>> private final TimerSpec stateExpirySpec = >>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >>>> >>>> @ProcessElement >>>> public void processElement(@Element KV<MyKey, RecordA> input, >>>> OutputReceiver<RecordA> out, >>>> @StateId("processedRecordA") >>>> ValueState<Set<RecordA>> processedRecordAState, >>>> @TimerId("stateExpiry") Timer >>>> stateExpiryTimer) { >>>> << code to check if recordA has already been processed by >>>> checking state >> >>>> if (recordA need to be emitted) { >>>> processedRecordAState.write(processedRecordASet); >>>> >>>> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); >>>> logger.info("Emitting unique recordA {} for myKey {}", >>>> recordA, myKey); >>>> out.output(input.getValue()); >>>> } >>>> } >>>> >>>> @OnTimer("stateExpiry") >>>> public void onExpiry( >>>> OnTimerContext context, >>>> @StateId("processedRecordA") ValueState<RecordA> >>>> processedRecordAState) { >>>> logger.info("Expiring State after timer expiry"); >>>> processedRecordAState.clear(); >>>> } >>>> >>>> @FinishBundle >>>> public void finishBundle() { >>>> } >>>> >>>> @Teardown >>>> public void teardown() { >>>> } >>>> } >>>> } >>>> >>>> Any help or suggestion ?? >>>> >>>> Thanks and Regards >>>> Mohil >>>> >>>