[ https://issues.apache.org/jira/browse/BEAM-7204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17122834#comment-17122834 ]
Beam JIRA Bot commented on BEAM-7204: ------------------------------------- This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3. Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean. > ReduceFnRunner overhead > ----------------------- > > Key: BEAM-7204 > URL: https://issues.apache.org/jira/browse/BEAM-7204 > Project: Beam > Issue Type: Improvement > Components: runner-flink, sdk-java-core > Reporter: Jozef Vilcek > Priority: P2 > Labels: performance, stale-P2 > > More context can be found in discussion here: > [http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E] > I have found out on FlinkRunner streaming pipeline there is an overhead > associated with processing each element at: > # ReduceFnRunner.scheduleGarbageCollectionTimer() for window > # tracking PaneInfo > This cause quite some trash for JVM GC. At least second option also involves > interaction with state backend. > > Relevant stacks for illustration: > ScheduleGarbageCollectionTimer > {noformat} > ... > [ 3] > org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer > [ 4] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer > [ 5] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById > [ 6] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer > [ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer > [ 8] > org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer > [ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement > [10] org.apache.beam.runners.core.ReduceFnRunner.processElements > [11] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement > [12] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement > [13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement > [14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement > [15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement > [16] > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement > [17] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement > [18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput > [19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run > [20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke > [21] org.apache.flink.runtime.taskmanager.Task.run > [22] java.lang.Thread.run > {noformat} > PaneInfoTracker: Read > {noformat} > [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get > [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get > [ 2] org.apache.flink.runtime.state.heap.HeapValueState.value > [ 3] > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read > [ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read > [ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read > [ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger > [ 7] org.apache.beam.runners.core.ReduceFnRunner.emit > [ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements > [ 9] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement > [10] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement > [11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement > [12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement > [13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement > [14] > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement > [15] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement > [16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput > [17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run > [18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke > [19] org.apache.flink.runtime.taskmanager.Task.run > [20] java.lang.Thread.run > {noformat} > PaneInfoTracker: Write > {noformat} > [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.putEntry > [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put > [ 2] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put > [ 3] org.apache.flink.runtime.state.heap.HeapValueState.update > [ 4] > org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.write > [ 5] org.apache.beam.runners.core.PaneInfoTracker.storeCurrentPaneInfo > [ 6] org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1 > [ 7] org.apache.beam.runners.core.ReduceFnRunner$$Lambda$101.211931975.output > [ 8] > org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output > [ 9] org.apache.beam.runners.core.SystemReduceFn.onTrigger > [10] org.apache.beam.runners.core.ReduceFnRunner.onTrigger > [11] org.apache.beam.runners.core.ReduceFnRunner.emit > [12] org.apache.beam.runners.core.ReduceFnRunner.processElements > [13] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement > [14] > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement > [15] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement > [16] org.apache.beam.runners.core.SimpleDoFnRunner.processElement > [17] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement > [18] > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement > [19] > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement > [20] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput > [21] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run > [22] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke > [23] org.apache.flink.runtime.taskmanager.Task.run > [24] java.lang.Thread.run > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)