[ https://issues.apache.org/jira/browse/FLINK-19970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247999#comment-17247999 ]
Thomas Wozniakowski commented on FLINK-19970: --------------------------------------------- [~dwysakowicz] I've emailed you over the code + JAR. Please give me a shout here or over email if you need anything else from me. > State leak in CEP Operators (expired events/keys not removed from state) > ------------------------------------------------------------------------ > > Key: FLINK-19970 > URL: https://issues.apache.org/jira/browse/FLINK-19970 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.11.2 > Environment: Flink 1.11.2 run using the official docker containers in > AWS ECS Fargate. > 1 Job Manager, 1 Taskmanager with 2vCPUs and 8GB memory > Reporter: Thomas Wozniakowski > Priority: Critical > Attachments: image-2020-11-04-11-35-12-126.png, screenshot-1.png, > screenshot-2.png, screenshot-3.png > > > We have been observing instability in our production environment recently, > seemingly related to state backends. We ended up building a load testing > environment to isolate factors and have discovered that the CEP library > appears to have some serious problems with state expiry. > h2. Job Topology > Source: Kinesis (standard connector) -> keyBy() and forward to... > CEP: Array of simple Keyed CEP Pattern operators (details below) -> forward > output to... > Sink: SQS (custom connector) > The CEP Patterns in the test look like this: > {code:java} > Pattern.begin(SCANS_SEQUENCE, AfterMatchSkipStrategy.skipPastLastEvent()) > .times(20) > .subtype(ScanEvent.class) > .within(Duration.minutes(30)); > {code} > h2. Taskmanager Config > {code:java} > taskmanager.numberOfTaskSlots: $numberOfTaskSlots > taskmanager.data.port: 6121 > taskmanager.rpc.port: 6122 > taskmanager.exit-on-fatal-akka-error: true > taskmanager.memory.process.size: $memoryProcessSize > taskmanager.memory.jvm-metaspace.size: 256m > taskmanager.memory.managed.size: 0m > jobmanager.rpc.port: 6123 > blob.server.port: 6130 > rest.port: 8081 > web.submit.enable: true > fs.s3a.connection.maximum: 50 > fs.s3a.threads.max: 50 > akka.framesize: 250m > akka.watch.threshold: 14 > state.checkpoints.dir: s3://$savepointBucketName/checkpoints > state.savepoints.dir: s3://$savepointBucketName/savepoints > state.backend: filesystem > state.backend.async: true > s3.access-key: $s3AccessKey > s3.secret-key: $s3SecretKey > {code} > (the substitutions are controlled by terraform). > h2. Tests > h4. Test 1 (No key rotation) > 8192 actors (different keys) emitting 1 Scan Event every 10 minutes > indefinitely. Actors (keys) never rotate in or out. > h4. Test 2 (Constant key rotation) > 8192 actors that produce 2 Scan events 10 minutes apart, then retire and > never emit again. The setup creates new actors (keys) as soon as one finishes > so we always have 8192. This test basically constantly rotates the key space. > h2. Results > For both tests, the state size (checkpoint size) grows unbounded and linearly > well past the 30 minute threshold that should have caused old keys or events > to be discard from the state. In the chart below, the left (steep) half is > the 24 hours we ran Test 1, the right (shallow) half is Test 2. My > understanding is that the checkpoint size should level off after ~45 minutes > or so then stay constant. > !image-2020-11-04-11-35-12-126.png! > Could someone please assist us with this? Unless we have dramatically > misunderstood how the CEP library is supposed to function this seems like a > pretty severe bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)