Hi, On another note, I think I was unnecessarily complicating things by applying a sliding window here and then an extra global window to remove duplicates. I replaced the *sliding window with a session window *(*as I know that my transaction consisting of recordA logs and recordB logs for a key "MyKey" won't last for more than 60-90 secs*), and my use case seems to be working fine. Even DRAIN is working successfully.
Thanks Mohil On Sun, May 17, 2020 at 3:37 PM Mohil Khare <mo...@prosimo.io> wrote: > Hello, > > I have a use case where I have two sets of PCollections (RecordA and > RecordB) coming from a real time streaming source like Kafka. > > Both Records are correlated with a common key, let's say KEY. > > The purpose is to enrich RecordA with RecordB's data for which I am using > CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 > minutes of event time, I am maintaining a sliding window for both records > and then do CoGpByKey for both PCollections. > > The sliding windows that will find both RecordA and RecordB for a common > key KEY, will emit enriched output. Now, since multiple sliding windows can > emit the same output, I finally remove duplicate results by feeding > aforementioned outputs to a global window where I maintain a state to check > whether output has already been processed or not. Since it is a global > window, I maintain a Timer on state (for GC) to let it expire after 10 > minutes have elapsed since state has been written. > > This is working perfectly fine w.r.t the expected results. However, I am > unable to stop job gracefully i.e. Drain the job gracefully. I see > following exception: > > java.lang.IllegalStateException: > org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received > state cleanup timer for window > org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is > before the appropriate cleanup time 294248-01-24T04:00:54.776Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.IllegalStateException: > org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received > state cleanup timer for window > org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is > before the appropriate cleanup time 294248-01-24T04:00:54.776Z > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > java.lang.IllegalStateException: > org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received > state cleanup timer for window > *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 > that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z* > > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467) > > org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > > > My code snippet: > > PCollection<KV<MyKey, RecordA>> windowedRecordA = > incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", > Window.<KV<MyKey, > RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); > > > PCollection<KV<MyKey, RecordB>> windowedRecordB = > recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, > RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45))) > > .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes()); > > PCollection<KV<MyKey, CoGbkResult>> coGbkRecords = > KeyedPCollectionTuple.of(TagRecordA, windowedRecordA) > .and(TagRecordB, windowedRecordB) > .apply("CoGroupByKey", CoGroupByKey.create()); > > > PCollection<RecordA> enrichedRecordA = > coGbkRecords.apply("EnrichRecordAWithRecordB", > new EnrichIncompleteRecordA()); > > > class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, > CoGbkResult>>, PCollection<RecordA>> { > @Override > public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> > input) { > logger.info("Enriching Incomplete RecordA with RecordB"); > return input > .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new > AddRecordBData())) > .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new > GlobalWindows()) > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .discardingFiredPanes()) > .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA())); > > } > > private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, > KV<MyKey, RecordA>> { > @Setup > public void setup() { > } > > @StartBundle > public void startBundle() { > > } > > @ProcessElement > public void processElement(@Element KV<MyKey, CoGbkResult> input, > OutputReceiver<KV<MyKey, RecordA>> out) { > Iterable<RecordA> allRecordALogs = > input.getValue().getAll(TagRecordA); > Iterable<RecordB> allRecordBLogs = > input.getValue().getAll(TagRecordB); > > /* > There should be max 1 RecordB per MyKey > */ > if (allRecordALogs.iterator().hasNext() && > allRecordBLogs.iterator().hasNext()) { > RecordB recordB = Iterables.getFirst(allRecordBLogs, null); > for (RecordA recordA : allRecordALogs) { > if (null != recordB) { > logger.info("Enriching incomplete recordA [{}] with > recordB: [{}]", recordA, recordB); > <code to populate recordA object with recordB data> > > out.output(KV.of(input.getKey(), recordA)); > } else { > logger.error("No recordB available for recordA log > [{}]", recordA); > } > } > } else { > logger.info("Either recordA or recordB not present for myKey: > {}", input.getKey()); > } > } > > @FinishBundle > public void finishBundle() { > > } > > @Teardown > public void teardown() { > } > } > > > private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> > { > @Setup > public void setup() { > } > > @StartBundle > public void startBundle() { > } > > @StateId("processedRecordA") > private final StateSpec<ValueState<RecordA> processedRecordASpec = > StateSpecs.value(AvroCoder.of(RecordA.class)); > > @TimerId("stateExpiry") > private final TimerSpec stateExpirySpec = > TimerSpecs.timer(TimeDomain.PROCESSING_TIME); > > @ProcessElement > public void processElement(@Element KV<MyKey, RecordA> input, > OutputReceiver<RecordA> out, > @StateId("processedRecordA") > ValueState<Set<RecordA>> processedRecordAState, > @TimerId("stateExpiry") Timer > stateExpiryTimer) { > << code to check if recordA has already been processed by > checking state >> > if (recordA need to be emitted) { > processedRecordAState.write(processedRecordASet); > > stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative(); > logger.info("Emitting unique recordA {} for myKey {}", > recordA, myKey); > out.output(input.getValue()); > } > } > > @OnTimer("stateExpiry") > public void onExpiry( > OnTimerContext context, > @StateId("processedRecordA") ValueState<RecordA> > processedRecordAState) { > logger.info("Expiring State after timer expiry"); > processedRecordAState.clear(); > } > > @FinishBundle > public void finishBundle() { > } > > @Teardown > public void teardown() { > } > } > } > > Any help or suggestion ?? > > Thanks and Regards > Mohil >