Hi Team,
I am facing an issue of running a beam stateful job on flink,
*Problem Statement:*
Stateful beam application with TUMBLE window running on Flink Runner
which has consistent checkpoint size increasing over time.
*Observation:*
The memory usage keeps increasing over time and getting OOM kill (code
137) on kubernetes pods.
*Version:*
Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica - frocksdbjni - 6.20.3-ververica-2.0)
*Assumption:*
State is never cleared on statebackend even when the window is closed.
*Questions:*
1. What is the significance of currentSideInputWatermark in
*org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator*
and how does it affect application without side input?
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
On removing the check *if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
*emitAllPushedBackData();
for every processwatermark reduces the checkpoint size, which otherwise
keeps increasing
2. When is the state information cleared on the WindowDoFn (TUMBLE
windows) on window closure ? When will global states and timers get
cleared?
3. Is timer and keystate information clearance by the following enough
to not have ever increasing memory or checkpoints?
*Flush on watermark:*
pushedBackElementsHandler.clear();
*Timer removal:*
keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());
*Global removal:*
keyedStateInternals.clearGlobalState();
-Hemant