[ 
https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Wozniakowski updated FLINK-19970:
----------------------------------------
    Attachment: screenshot-2.png

> State leak in CEP Operators (expired events/keys not removed from state)
> ------------------------------------------------------------------------
>
>                 Key: FLINK-19970
>                 URL: https://issues.apache.org/jira/browse/FLINK-19970
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.11.2
>         Environment: Flink 1.11.2 run using the official docker containers in 
> AWS ECS Fargate.
> 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory
>            Reporter: Thomas Wozniakowski
>            Priority: Critical
>         Attachments: image-2020-11-04-11-35-12-126.png, screenshot-1.png, 
> screenshot-2.png
>
>
> We have been observing instability in our production environment recently, 
> seemingly related to state backends. We ended up building a load testing 
> environment to isolate factors and have discovered that the CEP library 
> appears to have some serious problems with state expiry.
> h2. Job Topology
> Source: Kinesis (standard connector) -> keyBy() and forward to...
> CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward 
> output to...
> Sink: SQS (custom connector)
> The CEP Patterns in the test look like this:
> {code:java}
> Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent())
>     .times(20)
>     .subtype(ScanEvent.class)
>     .within(Duration.minutes(30));
> {code}
> h2. Taskmanager Config
> {code:java}
> taskmanager.numberOfTaskSlots: $numberOfTaskSlots
> taskmanager.data.port: 6121
> taskmanager.rpc.port: 6122
> taskmanager.exit-on-fatal-akka-error: true
> taskmanager.memory.process.size: $memoryProcessSize
> taskmanager.memory.jvm-metaspace.size: 256m
> taskmanager.memory.managed.size: 0m
> jobmanager.rpc.port: 6123
> blob.server.port: 6130
> rest.port: 8081
> web.submit.enable: true
> fs.s3a.connection.maximum: 50
> fs.s3a.threads.max: 50
> akka.framesize: 250m
> akka.watch.threshold: 14
> state.checkpoints.dir: s3://$savepointBucketName/checkpoints
> state.savepoints.dir: s3://$savepointBucketName/savepoints
> state.backend: filesystem
> state.backend.async: true
> s3.access-key: $s3AccessKey
> s3.secret-key: $s3SecretKey
> {code}
> (the substitutions are controlled by terraform).
> h2. Tests
> h4. Test 1 (No key rotation)
> 8192 actors (different keys) emitting 1 Scan Event every 10 minutes 
> indefinitely. Actors (keys) never rotate in or out.
> h4. Test 2 (Constant key rotation)
> 8192 actors that produce 2 Scan events 10 minutes apart, then retire and 
> never emit again. The setup creates new actors (keys) as soon as one finishes 
> so we always have 8192. This test basically constantly rotates the key space.
> h2. Results
> For both tests, the state size (checkpoint size) grows unbounded and linearly 
> well past the 30 minute threshold that should have caused old keys or events 
> to be discard from the state. In the chart below, the left (steep) half is 
> the 24 hours we ran Test 1, the right (shallow) half is Test 2.  My 
> understanding is that the checkpoint size should level off after ~45 minutes 
> or so then stay constant.
> !image-2020-11-04-11-35-12-126.png! 
> Could someone please assist us with this? Unless we have dramatically 
> misunderstood how the CEP library is supposed to function this seems like a 
> pretty severe bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to