Hello,

I have a use case where I have two sets of PCollections (RecordA and
RecordB) coming from a real time streaming source like Kafka.

Both Records are correlated with a common key, let's say KEY.

The purpose is to enrich RecordA with RecordB's data for which I am using
CoGbByKey. Since RecordA and RecordB for a common key can come within 1-2
minutes of event time, I am maintaining a sliding window for both records
and then do CoGpByKey for both PCollections.

The sliding windows that will find both RecordA and RecordB for a common
key KEY, will emit enriched output. Now, since multiple sliding windows can
emit the same output, I finally remove duplicate results by feeding
aforementioned outputs to a global window where I maintain a state to check
whether output has already been processed or not. Since it is a global
window, I maintain a Timer on state (for GC) to let it expire after 10
minutes have elapsed since state has been written.

This is working perfectly fine w.r.t the expected results. However, I am
unable to stop job gracefully i.e. Drain the job gracefully. I see
following exception:

java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@16b089a6 received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@59902a10 received
state cleanup timer for window
org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210 that is
before the appropriate cleanup time 294248-01-24T04:00:54.776Z
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException:
org.apache.beam.runners.dataflow.worker.SimpleParDoFn@4316932b received
state cleanup timer for window
*org.apache.beam.sdk.transforms.windowing.GlobalWindow@29ca0210
that is before the appropriate cleanup time 294248-01-24T04:00:54.776Z*
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:842)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processSystemTimer(SimpleParDoFn.java:384)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.access$700(SimpleParDoFn.java:73)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$TimerType$2.processTimer(SimpleParDoFn.java:444)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:467)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processTimers(SimpleParDoFn.java:354)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:52)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


My code snippet:

PCollection<KV<MyKey, RecordA>> windowedRecordA =
    incompleteRecordALogs.apply("Applying_Sliding_Window_RecordA",
Window.<KV<MyKey,
RecordA>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
        
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());


PCollection<KV<MyKey, RecordB>> windowedRecordB =
    recordBLogs.apply("Applying_Sliding_Window_RecordB",
Window.<KV<MyKey,
RecordB>>into(SlidingWindows.of(Duration.standardSeconds(90)).every(Duration.standardSeconds(45)))
        
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())).withAllowedLateness(Duration.standardSeconds(90)).discardingFiredPanes());

PCollection<KV<MyKey, CoGbkResult>> coGbkRecords =
    KeyedPCollectionTuple.of(TagRecordA, windowedRecordA)
        .and(TagRecordB, windowedRecordB)
        .apply("CoGroupByKey", CoGroupByKey.create());


PCollection<RecordA> enrichedRecordA =
    coGbkRecords.apply("EnrichRecordAWithRecordB",
        new EnrichIncompleteRecordA());


class EnrichIncompleteRecordA extends PTransform<PCollection<KV<MyKey,
CoGbkResult>>, PCollection<RecordA>> {
    @Override
    public PCollection<RecordA> expand(PCollection<KV<MyKey,
CoGbkResult>> input) {
        logger.info("Enriching Incomplete RecordA with RecordB");
        return input
            .apply("Add_RecordBInfo_To_RecordA", ParDo.of(new AddRecordBData()))
            .apply("Applying_Windowing", Window.<KV<MyKey,
RecordA>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())
            .apply("Emit_Unique_RecordA", ParDo.of(new EmitUniqueRecordA()));

    }

    private class AddRecordBData extends DoFn<KV<MyKey, CoGbkResult>,
KV<MyKey, RecordA>> {
        @Setup
        public void setup() {
        }

        @StartBundle
        public void startBundle() {

        }

        @ProcessElement
        public void processElement(@Element KV<MyKey, CoGbkResult>
input, OutputReceiver<KV<MyKey, RecordA>> out) {
            Iterable<RecordA> allRecordALogs =
input.getValue().getAll(TagRecordA);
            Iterable<RecordB> allRecordBLogs =
input.getValue().getAll(TagRecordB);

            /*
              There should be max 1 RecordB per MyKey
             */
            if (allRecordALogs.iterator().hasNext() &&
allRecordBLogs.iterator().hasNext()) {
                RecordB recordB = Iterables.getFirst(allRecordBLogs, null);
                for (RecordA recordA : allRecordALogs) {
                    if (null != recordB) {
                        logger.info("Enriching incomplete recordA [{}]
with recordB: [{}]", recordA, recordB);
                         <code to populate recordA object with recordB data>

                        out.output(KV.of(input.getKey(), recordA));
                    } else {
                        logger.error("No recordB available for recordA
log [{}]", recordA);
                    }
                }
            } else {
                logger.info("Either recordA or recordB not present for
myKey: {}", input.getKey());
            }
        }

        @FinishBundle
        public void finishBundle() {

        }

        @Teardown
        public void teardown() {
        }
    }


    private class EmitUniqueRecordA extends DoFn<KV<MyKey, RecordA>, RecordA> {
        @Setup
        public void setup() {
        }

        @StartBundle
        public void startBundle() {
        }

        @StateId("processedRecordA")
        private final StateSpec<ValueState<RecordA>
processedRecordASpec = StateSpecs.value(AvroCoder.of(RecordA.class));

        @TimerId("stateExpiry")
        private final TimerSpec stateExpirySpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @ProcessElement
        public void processElement(@Element KV<MyKey, RecordA> input,
OutputReceiver<RecordA> out,
                                   @StateId("processedRecordA")
ValueState<Set<RecordA>> processedRecordAState,
                                   @TimerId("stateExpiry") Timer
stateExpiryTimer) {
            << code to check if recordA has already been processed by
checking state >>
            if (recordA need to be emitted) {
                processedRecordAState.write(processedRecordASet);

stateExpiryTimer.offset(Duration.standardMinutes(10)).setRelative();
                logger.info("Emitting unique recordA {} for myKey {}",
recordA, myKey);
                out.output(input.getValue());
            }
        }

        @OnTimer("stateExpiry")
        public void onExpiry(
            OnTimerContext context,
            @StateId("processedRecordA") ValueState<RecordA>
processedRecordAState) {
            logger.info("Expiring State after timer expiry");
            processedRecordAState.clear();
        }

        @FinishBundle
        public void finishBundle() {
        }

        @Teardown
        public void teardown() {
        }
    }
}

Any help or suggestion ??

Thanks and Regards
Mohil

Reply via email to