MOHIL created BEAM-10053:
----------------------------

             Summary: Timers exception on "Job Drain" while using stateful beam 
processing in global window
                 Key: BEAM-10053
                 URL: https://issues.apache.org/jira/browse/BEAM-10053
             Project: Beam
          Issue Type: Bug
          Components: beam-community, runner-dataflow, sdk-java-core
    Affects Versions: 2.19.0
            Reporter: MOHIL
            Assignee: Aizhamal Nurmamat kyzy


Hello,
 
I have a use case where I have two sets of PCollections (RecordA and RecordB) 
coming from a real time streaming source like Kafka.
 
Both Records are correlated with a common key, let's say KEY.
 
The purpose is to enrich RecordA with RecordB's data for which I am using 
CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2 
minutes of event time, I am maintaining a sliding window for both records and 
then do CoGpByKey for both PCollections.
 
The sliding windows that will find both RecordA and RecordB for a common key 
KEY, will emit enriched output. Now, since multiple sliding windows can emit 
the same output, I finally remove duplicate results by feeding aforementioned 
outputs to a global window where I maintain a state to check whether output has 
already been processed or not. Since it is a global window, I maintain a Timer 
on state (for GC) to let it expire after 10 minutes have elapsed since state 
has been written.
 
This is working perfectly fine w.r.t the expected results. However, I am unable 
to stop job gracefully i.e. Drain the job gracefully. I see following exception:
 
java.lang.IllegalStateException: 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received state 
cleanup timer for window 
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before 
the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received state 
cleanup timer for window 
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before 
the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: 
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received state 
cleanup timer for window 
*org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is before 
the appropriate cleanup time 294248-01-24T04:00:54.776Z*
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
 
 
My code snippet:
 
PCollection<KV<MyKey, RecordA>> windowedRecordA =
 incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA", 
Window.<KV<MyKey, 
RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
 
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());
 PCollection<KV<MyKey, RecordB>> windowedRecordB =
 recordBLogs.apply("Applying_Sliding_Window_RecordB", Window.<KV<MyKey, 
RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
 
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());PCollection<KV<MyKey,
 CoGbkResult>> coGbkRecords =
 KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
 .and(TagRecordB, windowedRecordB)
 .apply("CoGroupByKey", CoGroupByKey.create()); PCollection<RecordA> 
enrichedRecordA =
 coGbkRecords.apply("EnrichRecordAWithRecordB",
 new EnrichIncompleteRecordA()); class EnrichIncompleteRecordA extends 
PTransform<PCollection<KV<MyKey, CoGbkResult>>, PCollection<RecordA>> {
 @Override
 public PCollection<RecordA> expand(PCollection<KV<MyKey, CoGbkResult>> input) {
 logger.info("Enriching Incomplete RecordA with RecordB");
 return input
 .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData()))
 .apply("Applying_Windowing", Window.<KV<MyKey, RecordA>>into(new 
GlobalWindows())
 .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
 .discardingFiredPanes())
 .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));

 }

 private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>, KV<MyKey, 
RecordA>> {
 @Setup
 public void setup() {
 }

 @StartBundle
 public void startBundle() {

 }

 @ProcessElement
 public void processElement(@Element KV<MyKey, CoGbkResult> input, 
OutputReceiver<KV<MyKey, RecordA>> out) {
 Iterable<RecordA> allRecordALogs = input.getValue().getAll(TagRecordA);
 Iterable<RecordB> allRecordBLogs = input.getValue().getAll(TagRecordB);

 /*
 There should be max 1 RecordB per MyKey
 */
 if (allRecordALogs.iterator().hasNext() && 
allRecordBLogs.iterator().hasNext()) {
 RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
 for (RecordA recordA : allRecordALogs) {
 if (null != recordB) {
 logger.info("Enriching incomplete recordA [{}] with recordB: [{}]", recordA, 
recordB);
 <code to populate recordA object with recordB data> 
out.output(KV.of(input.getKey(), recordA));
 } else {
 logger.error("No recordB available for recordA log [{}]", recordA);
 }
 }
 } else {
 logger.info("Either recordA or recordB not present for myKey: {}", 
input.getKey());
 }
 }

 @FinishBundle
 public void finishBundle() {

 }

 @Teardown
 public void teardown() {
 }
 }


 private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> {
 @Setup
 public void setup() {
 }

 @StartBundle
 public void startBundle() {
 }

 @StateId("processedRecordA")
 private final StateSpec<ValueState<RecordA> processedRecordASpec = 
StateSpecs.value(AvroCoder.of(RecordA.class));

 @TimerId("stateExpiry")
 private final TimerSpec stateExpirySpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

 @ProcessElement
 public void processElement(@Element KV<MyKey, RecordA> input, 
OutputReceiver<RecordA> out,
 @StateId("processedRecordA") ValueState<Set<RecordA>> processedRecordAState,
 @TimerId("stateExpiry") Timer stateExpiryTimer) {
 << code to check if recordA has already been processed by checking state >>
 if (recordA need to be emitted) {
 processedRecordAState.write(processedRecordASet);
 stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
 logger.info("Emitting unique recordA {} for myKey {}", recordA, myKey);
 out.output(input.getValue());
 }
 }

 @OnTimer("stateExpiry")
 public void onExpiry(
 OnTimerContext context,
 @StateId("processedRecordA") ValueState<RecordA> processedRecordAState) {
 logger.info("Expiring State after timer expiry");
 processedRecordAState.clear();
 }

 @FinishBundle
 public void finishBundle() {
 }

 @Teardown
 public void teardown() {
 }
 }
}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to