[ 
https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-6465:
----------------------------------
    Attachment: Screen Shot 2019-01-18 at 12.07.03.png

> Flink: State accumulation during restoring from a savepoint
> -----------------------------------------------------------
>
>                 Key: BEAM-6465
>                 URL: https://issues.apache.org/jira/browse/BEAM-6465
>             Project: Beam
>          Issue Type: Test
>          Components: beam-model
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>         Attachments: Screen Shot 2019-01-18 at 12.07.03.png
>
>
> This ticket captures my findings when restoring a BEAM job from a savepoint 
> on Flink runner.
>  
> *The problem*
> When job is restored from a savepoint taken a few hours ago than we see that 
> checkpoint size starts growing rediciously high which leads to the job 
> running out of heap space error (we use filesystem state backend).  
>  
> *Job structure*
> Job has two paths the data lake path and the aggregate data path. 
> *Data lake path*
> Data lake path is a dumb sink of all records received by the job. The records 
> are flushed to S3.
> Datalake trigger:
> {code:java}
> input.apply(   
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .discardingFiredPanes());{code}
>  
> *Aggregate path*
> Aggregate path has some group by key, count etc transformations 
>  
> Aggregate trigger:
> {code:java}
> input.apply( 
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
>         .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .accumulatingFiredPanes());{code}
>  
> *My investigation*
> Our team has written a tool to collect input watermarks from the Flink API. 
> It turned out that some sub operator(running on particular Flink slot) are 
> running slower thus watermark is falling behind other slots. Look at the 
> graph below:
>  
>  
>  
>  
> *My findings*
> If the state is being accumulated because of watermark slowing down on some 
> operator slots (Flink specific) than introducing *early firings* should help 
> ... and indeed helped. I can see that low watermark on JDBC task (where I 
> write to a database) 
>  
>  
>  
>  
> Setup:
>  * Job reads records from 32 Kinesis shards.
>  * Job parallelism 8
>  * Running on Beam 2.7 Flink 1.5
> *Hardware:*
>  ** Master:1 x m5.xlarge
>  ** Core instances: 5 x r4.2xlarge
>  * *YARN session configuration:*
>  ** 
> {code:java}
> /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming 
> --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD 
> classloader.resolve-order=parent-first -yD parallelism.default=8 -yD 
> containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD 
> yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 
> -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
> -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar 
> --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX 
> --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX 
> --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true 
> --checkpointingInterval=300000 --checkpointTimeoutMillis=360000 
> --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 
> --parallelism=8{code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to