Hi Reuven, all,

I have opened following jira to track this issue:

https://issues.apache.org/jira/browse/BEAM-10053

Thanks and Regards
Mohil

On Mon, May 18, 2020 at 12:28 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Reuven,
>
> Thanks for your reply. Well, I haven't filed JIRA yet. But if it looks
> like a bug at beam's end, I will file jira for this.
>
> Regards
> Mohil
>
> On Mon, May 18, 2020 at 12:26 PM Reuven Lax <re...@google.com> wrote:
>
>> However this still appears to be a bug - that exception should never be
>> thrown inside the Dataflow runner. Are you able to file a JIRA for this bug?
>>
>> On Mon, May 18, 2020 at 10:44 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Glad you were able to get this working; thanks for following up.
>>>
>>> On Mon, May 18, 2020 at 10:35 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi,
>>>> On another note, I think I was unnecessarily complicating things by
>>>> applying a sliding window here and then an extra global window to remove
>>>> duplicates.  I replaced the *sliding window with a session window *(*as
>>>> I know that my transaction consisting of recordA logs and recordB logs for
>>>> a key "MyKey" won't last for more than 60-90 secs*), and my use case
>>>> seems to be working fine. Even DRAIN is working successfully.
>>>>
>>>> Thanks
>>>> Mohil
>>>>
>>>> On Sun, May 17, 2020 at 3:37 PM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a use case where I have two sets of PCollections (RecordA and
>>>>> RecordB) coming from a real time streaming source like Kafka.
>>>>>
>>>>> Both Records are correlated with a common key, let's say KEY.
>>>>>
>>>>> The purpose is to enrich RecordA with RecordB's data for which I am
>>>>> using CoGbByKey. Since RecordA and RecordB for a common key can come 
>>>>> within
>>>>> 1-2 minutes of event time, I am maintaining a sliding window for both
>>>>> records and then do CoGpByKey for both PCollections.
>>>>>
>>>>> The sliding windows that will find both RecordA and RecordB for a
>>>>> common key KEY, will emit enriched output. Now, since multiple sliding
>>>>> windows can emit the same output, I finally remove duplicate results by
>>>>> feeding aforementioned outputs to a global window where I maintain a state
>>>>> to check whether output has already been processed or not. Since it is a
>>>>> global window, I maintain a Timer on state (for GC) to let it expire after
>>>>> 10 minutes have elapsed since state has been written.
>>>>>
>>>>> This is working perfectly fine w.r.t the expected results. However, I
>>>>> am unable to stop job gracefully i.e. Drain the job gracefully. I see
>>>>> following exception:
>>>>>
>>>>> java.lang.IllegalStateException:
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6
>>>>> received state cleanup timer for window
>>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that
>>>>> is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>>>
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> java.lang.Thread.run(Thread.java:745)
>>>>> java.lang.IllegalStateException:
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10
>>>>> received state cleanup timer for window
>>>>> org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that
>>>>> is before the appropriate cleanup time 294248-01-24T04:00:54.776Z
>>>>>
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> java.lang.Thread.run(Thread.java:745)
>>>>> java.lang.IllegalStateException:
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b
>>>>> received state cleanup timer for window 
>>>>> *org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210
>>>>> that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
>>>>>
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
>>>>>
>>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>> My code snippet:
>>>>>
>>>>> PCollection<KV<MyKey, RecordA>> windowedRecordA =
>>>>>     incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", 
>>>>> Window.<KV<MyKey, 
>>>>> RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>>>>>         
>>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>>>>>
>>>>>
>>>>> PCollection<KV<MyKey, RecordB>> windowedRecordB =
>>>>>     recordBLogs.apply("Applying_Sliding_Window_RecordB", 
>>>>> Window.<KV<MyKey, 
>>>>> RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
>>>>>         
>>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
>>>>>
>>>>> PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
>>>>>     KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
>>>>>         .and(TagRecordB, windowedRecordB)
>>>>>         .apply("CoGroupByKey", CoGroupByKey.create());
>>>>>
>>>>>
>>>>> PCollection<RecordA> enrichedRecordA =
>>>>>     coGbkRecords.apply("EnrichRecordAWithRecordB",
>>>>>         new EnrichIncompleteRecordA());
>>>>>
>>>>>
>>>>> class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey, 
>>>>> CoGbkResult>>, PCollection<RecordA>> {
>>>>>     @Override
>>>>>     public PCollection<RecordA> expand(PCollection<KV<MyKey, 
>>>>> CoGbkResult>> input) {
>>>>>         logger.info("Enriching Incomplete RecordA with RecordB");
>>>>>         return input
>>>>>             .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new 
>>>>> AddRecordBData()))
>>>>>             .apply("Applying_Windowing", Window.<KV<MyKey, 
>>>>> RecordA>>into(new GlobalWindows())
>>>>>                 
>>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>>>>                 .discardingFiredPanes())
>>>>>             .apply("Emit_Unique_RecordA", ParDo.of(new 
>>>>> EmitUniqueRecordA()));
>>>>>
>>>>>     }
>>>>>
>>>>>     private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, 
>>>>> KV<MyKey, RecordA>> {
>>>>>         @Setup
>>>>>         public void setup() {
>>>>>         }
>>>>>
>>>>>         @StartBundle
>>>>>         public void startBundle() {
>>>>>
>>>>>         }
>>>>>
>>>>>         @ProcessElement
>>>>>         public void processElement(@Element KV<MyKey, CoGbkResult> input, 
>>>>> OutputReceiver<KV<MyKey, RecordA>> out) {
>>>>>             Iterable<RecordA> allRecordALogs = 
>>>>> input.getValue().getAll(TagRecordA);
>>>>>             Iterable<RecordB> allRecordBLogs = 
>>>>> input.getValue().getAll(TagRecordB);
>>>>>
>>>>>             /*
>>>>>               There should be max 1 RecordB per MyKey
>>>>>              */
>>>>>             if (allRecordALogs.iterator().hasNext() && 
>>>>> allRecordBLogs.iterator().hasNext()) {
>>>>>                 RecordB recordB = Iterables.getFirst(allRecordBLogs, 
>>>>> null);
>>>>>                 for (RecordA recordA : allRecordALogs) {
>>>>>                     if (null != recordB) {
>>>>>                         logger.info("Enriching incomplete recordA [{}] 
>>>>> with recordB: [{}]", recordA, recordB);
>>>>>                          <code to populate recordA object with recordB 
>>>>> data>
>>>>>
>>>>>                         out.output(KV.of(input.getKey(), recordA));
>>>>>                     } else {
>>>>>                         logger.error("No recordB available for recordA 
>>>>> log [{}]", recordA);
>>>>>                     }
>>>>>                 }
>>>>>             } else {
>>>>>                 logger.info("Either recordA or recordB not present for 
>>>>> myKey: {}", input.getKey());
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @FinishBundle
>>>>>         public void finishBundle() {
>>>>>
>>>>>         }
>>>>>
>>>>>         @Teardown
>>>>>         public void teardown() {
>>>>>         }
>>>>>     }
>>>>>
>>>>>
>>>>>     private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, 
>>>>> RecordA> {
>>>>>         @Setup
>>>>>         public void setup() {
>>>>>         }
>>>>>
>>>>>         @StartBundle
>>>>>         public void startBundle() {
>>>>>         }
>>>>>
>>>>>         @StateId("processedRecordA")
>>>>>         private final StateSpec<ValueState<RecordA> processedRecordASpec 
>>>>> = StateSpecs.value(AvroCoder.of(RecordA.class));
>>>>>
>>>>>         @TimerId("stateExpiry")
>>>>>         private final TimerSpec stateExpirySpec = 
>>>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>>>>
>>>>>         @ProcessElement
>>>>>         public void processElement(@Element KV<MyKey, RecordA> input, 
>>>>> OutputReceiver<RecordA> out,
>>>>>                                    @StateId("processedRecordA") 
>>>>> ValueState<Set<RecordA>> processedRecordAState,
>>>>>                                    @TimerId("stateExpiry") Timer 
>>>>> stateExpiryTimer) {
>>>>>             << code to check if recordA has already been processed by 
>>>>> checking state >>
>>>>>             if (recordA need to be emitted) {
>>>>>                 processedRecordAState.write(processedRecordASet);
>>>>>                 
>>>>> stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
>>>>>                 logger.info("Emitting unique recordA {} for myKey {}", 
>>>>> recordA, myKey);
>>>>>                 out.output(input.getValue());
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @OnTimer("stateExpiry")
>>>>>         public void onExpiry(
>>>>>             OnTimerContext context,
>>>>>             @StateId("processedRecordA") ValueState<RecordA> 
>>>>> processedRecordAState) {
>>>>>             logger.info("Expiring State after timer expiry");
>>>>>             processedRecordAState.clear();
>>>>>         }
>>>>>
>>>>>         @FinishBundle
>>>>>         public void finishBundle() {
>>>>>         }
>>>>>
>>>>>         @Teardown
>>>>>         public void teardown() {
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Any help or suggestion ??
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>

Reply via email to