Hi,
We are trying to build sessionization where we get clickstream hits from
Kafka and generate sessions from the hits. We are using Apache Beam for our
code and it runs on Samza runner. We have a PCollection<String, Event> where
key is user id and value is clickstream hit. We are grouping by user id and
calculating sessions.
We are using following windowing strategy:
PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into(
Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(60)))
)
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(200))
)
But events we are getting are out of order. So, we are getting timestamp from
the hit and adding it as event timestamp in order to have it as part of correct
session. We are using WithTimestamps.of() for that.
We are saving intermediate state in Kafka topics. We are getting duplicate key
registered for the same timer exception. When I tried with different scenarios
when this issue is happening, we figured out that when events are coming out of
order. For a user when a hit comes and later some hit of earlier timestamp
comes then it is throwing duplicate key timer exception. It is writing all
these events into intermediate Kafka topic from which duplicate key timer
exception is being thrown. First out of order event is being written into this
Kafka topic and very next moment this process is failing with duplicate key
timer issue.
Stack trace:
ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException:
Callback failed for task Partition 8, ssp SystemStreamPartition [kafka,
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
8], offset 825. org.apache.samza.SamzaException:
org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
SystemStreamPartition [kafka,
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
8], offset 825. at
org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at
org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
SystemStreamPartition [kafka,
cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
8], offset 825. at
org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
at
org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
at
org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
... 5 common frames omitted Caused by:
org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException:
Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
registration for the same timer at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96) at
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.ArrayList.forEach(ArrayList.java:1257) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.ArrayList.forEach(ArrayList.java:1257) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
at
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
... 7 common frames omitted Caused by: java.lang.RuntimeException:
org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException:
Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
registration for the same timer at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84)
at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
at
org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalStateException: Duplicate key
org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for
the same timer at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source) at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179)
at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55)
at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80)
at
org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42)
at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78)
at
org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
at
org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82) at
org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at
org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.ArrayList.forEach(ArrayList.java:1257) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.ArrayList.forEach(ArrayList.java:1257) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
at java.lang.Iterable.forEach(Iterable.java:75) at
org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
at
org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
at
org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
at
org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
at
org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.IllegalStateException: Duplicate key
org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for
the same timer at
com.google.common.base.Preconditions.checkState(Preconditions.java:459) at
org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62)
at
org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37)
at
org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446)
at
org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214)
at
org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135)
at
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
at
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
at
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
at
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478)
at
org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210)
at
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
at
org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62)
at
org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
at
org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172)
at
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510)
at
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
at
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
at
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Does anyone have suggestions or experienced similar issue previously?
Thanks,
Sandeep