Hi,
On another note, I think I was unnecessarily complicating things by
applying a sliding window here and then an extra global window to remove
duplicates.  I replaced the *sliding window with a session window *(*as I
know that my transaction consisting of recordA logs and recordB logs for a
key "MyKey" won't last for more than 60-90 secs*), and my use case seems to
be working fine. Even DRAIN is working successfully.

Thanks
Mohil

On Sun, May 17, 2020 at 3:37 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello,
>
> I have a use case where I have two sets of PCollections (RecordA and
> RecordB) coming from a real time streaming source like Kafka.
>
> Both Records are correlated with a common key, let's say KEY.
>
> The purpose is to enrich RecordA with RecordB's data for which I am using
> CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2
> minutes of event time, I am maintaining a sliding window for both records
> and then do CoGpByKey for both PCollections.
>
> The sliding windows that will find both RecordA and RecordB for a common
> key KEY, will emit enriched output. Now, since multiple sliding windows can
> emit the same output, I finally remove duplicate results by feeding
> aforementioned outputs to a global window where I maintain a state to check
> whether output has already been processed or not. Since it is a global
> window, I maintain a Timer on state (for GC) to let it expire after 10
> minutes have elapsed since state has been written.
>
> This is working perfectly fine w.r.t the expected results. However, I am
> unable to stop job gracefully i.e. Drain the job gracefully. I see
> following exception:
>
> java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
> state cleanup timer for window
> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
> before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> java.lang.IllegalStateException:
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received
> state cleanup timer for window 
> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210
> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>
> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> My code snippet:
>
> PCollection<KV<MyKey, RecordA>> windowedRecordA =
>     incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", 
> Window.<KV<MyKey, 
> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>         
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>
>
> PCollection<KV<MyKey, RecordB>> windowedRecordB =
>     recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, 
> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>         
> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>
> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
>     KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
>         .and(TagRecordB, windowedRecordB)
>         .apply("CoGroupByKey", CoGroupByKey.create());
>
>
> PCollection<RecordA> enrichedRecordA =
>     coGbkRecords.apply("EnrichRecordAWithRecordB",
>         new EnrichIncompleteRecordA());
>
>
> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, 
> CoGbkResult>>, PCollection<RecordA>> {
>     @Override
>     public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> 
> input) {
>         logger.info("Enriching Incomplete RecordA with RecordB");
>         return input
>             .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new 
> AddRecordBData()))
>             .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new 
> GlobalWindows())
>                 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                 .discardingFiredPanes())
>             .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));
>
>     }
>
>     private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, 
> KV<MyKey, RecordA>> {
>         @Setup
>         public void setup() {
>         }
>
>         @StartBundle
>         public void startBundle() {
>
>         }
>
>         @ProcessElement
>         public void processElement(@Element KV<MyKey, CoGbkResult> input, 
> OutputReceiver<KV<MyKey, RecordA>> out) {
>             Iterable<RecordA> allRecordALogs = 
> input.getValue().getAll(TagRecordA);
>             Iterable<RecordB> allRecordBLogs = 
> input.getValue().getAll(TagRecordB);
>
>             /*
>               There should be max 1 RecordB per MyKey
>              */
>             if (allRecordALogs.iterator().hasNext() && 
> allRecordBLogs.iterator().hasNext()) {
>                 RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
>                 for (RecordA recordA : allRecordALogs) {
>                     if (null != recordB) {
>                         logger.info("Enriching incomplete recordA [{}] with 
> recordB: [{}]", recordA, recordB);
>                          <code to populate recordA object with recordB data>
>
>                         out.output(KV.of(input.getKey(), recordA));
>                     } else {
>                         logger.error("No recordB available for recordA log 
> [{}]", recordA);
>                     }
>                 }
>             } else {
>                 logger.info("Either recordA or recordB not present for myKey: 
> {}", input.getKey());
>             }
>         }
>
>         @FinishBundle
>         public void finishBundle() {
>
>         }
>
>         @Teardown
>         public void teardown() {
>         }
>     }
>
>
>     private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> 
> {
>         @Setup
>         public void setup() {
>         }
>
>         @StartBundle
>         public void startBundle() {
>         }
>
>         @StateId("processedRecordA")
>         private final StateSpec<ValueState<RecordA> processedRecordASpec = 
> StateSpecs.value(AvroCoder.of(RecordA.class));
>
>         @TimerId("stateExpiry")
>         private final TimerSpec stateExpirySpec = 
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>
>         @ProcessElement
>         public void processElement(@Element KV<MyKey, RecordA> input, 
> OutputReceiver<RecordA> out,
>                                    @StateId("processedRecordA") 
> ValueState<Set<RecordA>> processedRecordAState,
>                                    @TimerId("stateExpiry") Timer 
> stateExpiryTimer) {
>             << code to check if recordA has already been processed by 
> checking state >>
>             if (recordA need to be emitted) {
>                 processedRecordAState.write(processedRecordASet);
>                 
> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
>                 logger.info("Emitting unique recordA {} for myKey {}", 
> recordA, myKey);
>                 out.output(input.getValue());
>             }
>         }
>
>         @OnTimer("stateExpiry")
>         public void onExpiry(
>             OnTimerContext context,
>             @StateId("processedRecordA") ValueState<RecordA> 
> processedRecordAState) {
>             logger.info("Expiring State after timer expiry");
>             processedRecordAState.clear();
>         }
>
>         @FinishBundle
>         public void finishBundle() {
>         }
>
>         @Teardown
>         public void teardown() {
>         }
>     }
> }
>
> Any help or suggestion ??
>
> Thanks and Regards
> Mohil
>

Reply via email to