Hi Reuven, all, I have opened following jira to track this issue:
https://issues.apache.org/jira/browse/BEAM-10053 Thanks and Regards Mohil On Mon, May 18, 2020 at 12:28 PM Mohil Khare <mo...@prosimo.io> wrote: > Hi Reuven, > > Thanks for your reply. Well, I haven't filed JIRA yet. But if it looks > like a bug at beam's end, I will file jira for this. > > Regards > Mohil > > On Mon, May 18, 2020 at 12:26 PM Reuven Lax <re...@google.com> wrote: > >> However this still appears to be a bug - that exception should never be >> thrown inside the Dataflow runner. Are you able to file a JIRA for this bug? >> >> On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> Glad you were able to get this working; thanks for following up. >>> >>> On Mon, May 18, 2020 at 10:35 AM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hi, >>>> On another note, I think I was unnecessarily complicating things by >>>> applying a sliding window here and then an extra global window to remove >>>> duplicates. I replaced the *sliding window with a session window *(*as >>>> I know that my transaction consisting of recordA logs and recordB logs for >>>> a key "MyKey" won't last for more than 60-90 secs*), and my use case >>>> seems to be working fine. Even DRAIN is working successfully. >>>> >>>> Thanks >>>> Mohil >>>> >>>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare <mo...@prosimo.io> wrote: >>>> >>>>> Hello, >>>>> >>>>> I have a use case where I have two sets of PCollections (RecordA and >>>>> RecordB) coming from a real time streaming source like Kafka. >>>>> >>>>> Both Records are correlated with a common key, let's say KEY. >>>>> >>>>> The purpose is to enrich RecordA with RecordB's data for which I am >>>>> using CoGbByKey. Since RecordA and RecordB for a common key can come >>>>> within >>>>> 1-2 minutes of event time, I am maintaining a sliding window for both >>>>> records and then do CoGpByKey for both PCollections. >>>>> >>>>> The sliding windows that will find both RecordA and RecordB for a >>>>> common key KEY, will emit enriched output. Now, since multiple sliding >>>>> windows can emit the same output, I finally remove duplicate results by >>>>> feeding aforementioned outputs to a global window where I maintain a state >>>>> to check whether output has already been processed or not. Since it is a >>>>> global window, I maintain a Timer on state (for GC) to let it expire after >>>>> 10 minutes have elapsed since state has been written. >>>>> >>>>> This is working perfectly fine w.r.t the expected results. However, I >>>>> am unable to stop job gracefully i.e. Drain the job gracefully. I see >>>>> following exception: >>>>> >>>>> java.lang.IllegalStateException: >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 >>>>> received state cleanup timer for window >>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that >>>>> is before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>>>> >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> java.lang.Thread.run(Thread.java:745) >>>>> java.lang.IllegalStateException: >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 >>>>> received state cleanup timer for window >>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that >>>>> is before the appropriate cleanup time 294248-01-24T04:00:54.776Z >>>>> >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> java.lang.Thread.run(Thread.java:745) >>>>> java.lang.IllegalStateException: >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b >>>>> received state cleanup timer for window >>>>> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 >>>>> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* >>>>> >>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) >>>>> >>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>> java.lang.Thread.run(Thread.java:745) >>>>> >>>>> >>>>> My code snippet: >>>>> >>>>> PCollection<KV<MyKey, RecordA>> windowedRecordA = >>>>> incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", >>>>> Window.<KV<MyKey, >>>>> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>>>> >>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>>>> >>>>> >>>>> PCollection<KV<MyKey, RecordB>> windowedRecordB = >>>>> recordBLogs.apply("Applying_Sliding_Window_RecordB", >>>>> Window.<KV<MyKey, >>>>> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) >>>>> >>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); >>>>> >>>>> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = >>>>> KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) >>>>> .and(TagRecordB, windowedRecordB) >>>>> .apply("CoGroupByKey", CoGroupByKey.create()); >>>>> >>>>> >>>>> PCollection<RecordA> enrichedRecordA = >>>>> coGbkRecords.apply("EnrichRecordAWithRecordB", >>>>> new EnrichIncompleteRecordA()); >>>>> >>>>> >>>>> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, >>>>> CoGbkResult>>, PCollection<RecordA>> { >>>>> @Override >>>>> public PCollection<RecordA> expand(PCollection<KV<MyKey, >>>>> CoGbkResult>> input) { >>>>> logger.info("Enriching Incomplete RecordA with RecordB"); >>>>> return input >>>>> .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new >>>>> AddRecordBData())) >>>>> .apply("Applying_Windowing", Window.<KV<MyKey, >>>>> RecordA>>into(new GlobalWindows()) >>>>> >>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>>>> .discardingFiredPanes()) >>>>> .apply("Emit_Unique_RecordA", ParDo.of(new >>>>> EmitUniqueRecordA())); >>>>> >>>>> } >>>>> >>>>> private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, >>>>> KV<MyKey, RecordA>> { >>>>> @Setup >>>>> public void setup() { >>>>> } >>>>> >>>>> @StartBundle >>>>> public void startBundle() { >>>>> >>>>> } >>>>> >>>>> @ProcessElement >>>>> public void processElement(@Element KV<MyKey, CoGbkResult> input, >>>>> OutputReceiver<KV<MyKey, RecordA>> out) { >>>>> Iterable<RecordA> allRecordALogs = >>>>> input.getValue().getAll(TagRecordA); >>>>> Iterable<RecordB> allRecordBLogs = >>>>> input.getValue().getAll(TagRecordB); >>>>> >>>>> /* >>>>> There should be max 1 RecordB per MyKey >>>>> */ >>>>> if (allRecordALogs.iterator().hasNext() && >>>>> allRecordBLogs.iterator().hasNext()) { >>>>> RecordB recordB = Iterables.getFirst(allRecordBLogs, >>>>> null); >>>>> for (RecordA recordA : allRecordALogs) { >>>>> if (null != recordB) { >>>>> logger.info("Enriching incomplete recordA [{}] >>>>> with recordB: [{}]", recordA, recordB); >>>>> <code to populate recordA object with recordB >>>>> data> >>>>> >>>>> out.output(KV.of(input.getKey(), recordA)); >>>>> } else { >>>>> logger.error("No recordB available for recordA >>>>> log [{}]", recordA); >>>>> } >>>>> } >>>>> } else { >>>>> logger.info("Either recordA or recordB not present for >>>>> myKey: {}", input.getKey()); >>>>> } >>>>> } >>>>> >>>>> @FinishBundle >>>>> public void finishBundle() { >>>>> >>>>> } >>>>> >>>>> @Teardown >>>>> public void teardown() { >>>>> } >>>>> } >>>>> >>>>> >>>>> private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, >>>>> RecordA> { >>>>> @Setup >>>>> public void setup() { >>>>> } >>>>> >>>>> @StartBundle >>>>> public void startBundle() { >>>>> } >>>>> >>>>> @StateId("processedRecordA") >>>>> private final StateSpec<ValueState<RecordA> processedRecordASpec >>>>> = StateSpecs.value(AvroCoder.of(RecordA.class)); >>>>> >>>>> @TimerId("stateExpiry") >>>>> private final TimerSpec stateExpirySpec = >>>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME); >>>>> >>>>> @ProcessElement >>>>> public void processElement(@Element KV<MyKey, RecordA> input, >>>>> OutputReceiver<RecordA> out, >>>>> @StateId("processedRecordA") >>>>> ValueState<Set<RecordA>> processedRecordAState, >>>>> @TimerId("stateExpiry") Timer >>>>> stateExpiryTimer) { >>>>> << code to check if recordA has already been processed by >>>>> checking state >> >>>>> if (recordA need to be emitted) { >>>>> processedRecordAState.write(processedRecordASet); >>>>> >>>>> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); >>>>> logger.info("Emitting unique recordA {} for myKey {}", >>>>> recordA, myKey); >>>>> out.output(input.getValue()); >>>>> } >>>>> } >>>>> >>>>> @OnTimer("stateExpiry") >>>>> public void onExpiry( >>>>> OnTimerContext context, >>>>> @StateId("processedRecordA") ValueState<RecordA> >>>>> processedRecordAState) { >>>>> logger.info("Expiring State after timer expiry"); >>>>> processedRecordAState.clear(); >>>>> } >>>>> >>>>> @FinishBundle >>>>> public void finishBundle() { >>>>> } >>>>> >>>>> @Teardown >>>>> public void teardown() { >>>>> } >>>>> } >>>>> } >>>>> >>>>> Any help or suggestion ?? >>>>> >>>>> Thanks and Regards >>>>> Mohil >>>>> >>>>