[ 
https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Romanenko updated BEAM-6465:
-----------------------------------
    Component/s:     (was: beam-model)
                 runner-flink

> Flink: State accumulation during restoring from a savepoint
> -----------------------------------------------------------
>
>                 Key: BEAM-6465
>                 URL: https://issues.apache.org/jira/browse/BEAM-6465
>             Project: Beam
>          Issue Type: Test
>          Components: runner-flink
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>         Attachments: Screen Shot 2019-01-18 at 11.38.02.png, Screen Shot 
> 2019-01-18 at 12.07.03 copy.png
>
>
> This ticket captures my findings when restoring a BEAM job from a savepoint 
> on a Flink runner.
>  
> *The problem*
> When job is restored from a savepoint taken a few hours ago than we see that 
> checkpoint size starts growing rediciously high which leads to the job 
> running out of heap space error. We use filesystem state backend, which keeps 
> state on the heap.  
>  
> *Job structure*
> Job has two paths the data lake path and the aggregate data path. 
> *Data lake path*
> Data lake path is a dumb sink of all records received by the job. The records 
> are flushed to S3.
> Datalake trigger:
> {code:java}
> input.apply(   
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .discardingFiredPanes()); <-- IMPORTANT {code}
>  
> *Aggregate path*
> Aggregate path has some group by key, count etc transformations 
> Aggregate trigger:
> {code:java}
> input.apply( 
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
>         .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .accumulatingFiredPanes());{code}
>  
> *My investigation*
> Our team has written a tool to collect input watermarks from the Flink API. 
> It turned out that it's a common situation that for pretty much every 
> operator some sub operators (running on particular Flink slot) are running 
> slower thus watermark is falling behind other slots. Look at the graph below: 
> !Screen Shot 2019-01-18 at 12.07.03 copy.png!
> Y axis represents an event time, X wall clock time. Graph shows input 
> watermarks for some operator in the job. Each line represents an input 
> watermark for a specific slot (job runs with parallelism 8) for which 
> operator is running on.
> At 17:55 the difference between slowest slot and quickest one is already 20 
> mins. This means that 20 mins of data will be buffered in memory until 
> slowest slot's watermark will cross the end of the 1 minute window of the 
> buffered data. 
> Unfortunately it's very hard to tell why some slots are doing better then the 
> other( I believe data is properly balanced when it comes to hashing etc)  
>  
> Below I present checkpoint size growing in size. Normally when not restoring 
> from a job checkpoint size is around 3 GB
> !Screen Shot 2019-01-18 at 11.38.02.png!
>  
> *My findings*
> If the state is being accumulated because of watermark slowing down on some 
> operator slots (Flink specific) than introducing *early firings* *with 
> discarding mode* should help ... and indeed helped.
>  
> *My worry is that introducing an early firings as a way to avoid OOM error 
> during catchup seems hacky to me. The other downside is that early firings 
> introduce speculative results which might not be acceptable in some cases.*
>  
> *Setup:*
>  * Job reads records from 32 Kinesis shards.
>  * Job parallelism 8
>  * Running on Beam 2.7 Flink 1.5
>  *Hardware:*
>  ** Master:1 x m5.xlarge
>  ** Core instances: 5 x r4.2xlarge
>  * *YARN session configuration:*
>  ** 
> {code:java}
> /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming 
> --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD 
> classloader.resolve-order=parent-first -yD parallelism.default=8 -yD 
> containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD 
> yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 
> -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
> -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar 
> --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX 
> --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX 
> --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true 
> --checkpointingInterval=300000 --checkpointTimeoutMillis=360000 
> --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 
> --parallelism=8{code}
>  
>   
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to