[ https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Romanenko reassigned BEAM-6465: -------------------------------------- Assignee: Maximilian Michels (was: Kenneth Knowles) > Flink: State accumulation during restoring from a savepoint > ----------------------------------------------------------- > > Key: BEAM-6465 > URL: https://issues.apache.org/jira/browse/BEAM-6465 > Project: Beam > Issue Type: Test > Components: runner-flink > Affects Versions: 2.7.0 > Reporter: Pawel Bartoszek > Assignee: Maximilian Michels > Priority: Major > Attachments: Screen Shot 2019-01-18 at 11.38.02.png, Screen Shot > 2019-01-18 at 12.07.03 copy.png > > > This ticket captures my findings when restoring a BEAM job from a savepoint > on a Flink runner. > > *The problem* > When job is restored from a savepoint taken a few hours ago than we see that > checkpoint size starts growing rediciously high which leads to the job > running out of heap space error. We use filesystem state backend, which keeps > state on the heap. > > *Job structure* > Job has two paths the data lake path and the aggregate data path. > *Data lake path* > Data lake path is a dumb sink of all records received by the job. The records > are flushed to S3. > Datalake trigger: > {code:java} > input.apply( > WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7))) > .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1))) > .triggering( > AfterWatermark.pastEndOfWindow() > > .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod)) > ) > .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS) > .discardingFiredPanes()); <-- IMPORTANT {code} > > *Aggregate path* > Aggregate path has some group by key, count etc transformations > Aggregate trigger: > {code:java} > input.apply( > WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7))) > .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE)) > .triggering( > AfterWatermark.pastEndOfWindow() > > .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod)) > ) > .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS) > .accumulatingFiredPanes());{code} > > *My investigation* > Our team has written a tool to collect input watermarks from the Flink API. > It turned out that it's a common situation that for pretty much every > operator some sub operators (running on particular Flink slot) are running > slower thus watermark is falling behind other slots. Look at the graph below: > !Screen Shot 2019-01-18 at 12.07.03 copy.png! > Y axis represents an event time, X wall clock time. Graph shows input > watermarks for some operator in the job. Each line represents an input > watermark for a specific slot (job runs with parallelism 8) for which > operator is running on. > At 17:55 the difference between slowest slot and quickest one is already 20 > mins. This means that 20 mins of data will be buffered in memory until > slowest slot's watermark will cross the end of the 1 minute window of the > buffered data. > Unfortunately it's very hard to tell why some slots are doing better then the > other( I believe data is properly balanced when it comes to hashing etc) > > Below I present checkpoint size growing in size. Normally when not restoring > from a job checkpoint size is around 3 GB > !Screen Shot 2019-01-18 at 11.38.02.png! > > *My findings* > If the state is being accumulated because of watermark slowing down on some > operator slots (Flink specific) than introducing *early firings* *with > discarding mode* should help ... and indeed helped. > > *My worry is that introducing an early firings as a way to avoid OOM error > during catchup seems hacky to me. The other downside is that early firings > introduce speculative results which might not be acceptable in some cases.* > > *Setup:* > * Job reads records from 32 Kinesis shards. > * Job parallelism 8 > * Running on Beam 2.7 Flink 1.5 > *Hardware:* > ** Master:1 x m5.xlarge > ** Core instances: 5 x r4.2xlarge > * *YARN session configuration:* > ** > {code:java} > /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming > --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD > classloader.resolve-order=parent-first -yD parallelism.default=8 -yD > containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD > yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 > -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 > -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution > -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar > --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX > --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX > --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true > --checkpointingInterval=300000 --checkpointTimeoutMillis=360000 > --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 > --parallelism=8{code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)