[ https://issues.apache.org/jira/browse/FLINK-26793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522355#comment-17522355 ]
Etienne Chauchot commented on FLINK-26793: ------------------------------------------ [~bumblebee] I reproduced the behavior you observed: I ran [this streaming infinite pipeline|https://github.com/echauchot/flink-samples/blob/master/src/main/java/org/example/CassandraPojoSinkStreamingExample.java] for 3 days on a local flink 1.14.4 cluster + cassandra 3.0 docker (I did not have rights to instanciate a cassandra cluster on Azure). The pipeline has checkpointing configured every 10 min with exactly once semantics and no watermark defined. It was run at parallelism 16 which corresponds to the number of cores on my laptop. I created a source that gives pojos every 100 ms. The source is mono-threaded so at parallelism 1. See all the screenshots I ran the pipeline for more than 72 hours and indeed after little less than 72h, I got an exception from Cassandra cluster see task manager log: {code:java} 2022-04-13 16:38:15,227 ERROR org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink [] - Error while sending value. com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were required but only 0 acknowledged the write) {code} This exception means that Cassandra coordinator node (internal Cassandra) waited too long for an internal replication (raplication to another node in the same casssandra "datacenter") and did not ack the write. This led to a failure of the write task and to a restoration of the job from the last checkpoint see job manager log: {code:java} 2022-04-13 16:38:20,847 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Cassandra Pojo Sink Streaming example (dc7522bc1855f6f98038ac2b4eed4095) switched from state RESTARTING to RUNNING. 2022-04-13 16:38:20,850 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job dc7522bc1855f6f98038ac2b4eed4095 from Checkpoint 136 @ 1649858983772 for dc7522bc1855f6f98038ac2b4eed4095 located at file:/tmp/flink-checkpoints/dc7522bc1855f6f98038ac2b4eed4095/chk-136. {code} Obviously, this restoration led to the re-creation of the _CassandraPojoSink_ and to the re-creation of the related _MappingManager_ So in short, this is exactly what I supposed in my previous comments. Restoring from checkpoints slow down you writes (job restart time + cassandra driver state re-creation - connection, prepared statements etc... -) The problem is that the timeout comes from Cassandra itself not from Flink and it is normal that Flink restores the job in such circumstances. What you can do is to increase the Cassandra write timeout to your workload in your Cassandra cluster so that such errors do not happen. For that you need to raise _write_request_timeout_in_ms_ conf parameter in your _cassandra.yml_. I do not recommend that you lower the replication factor in your Cassandra cluster (I did that only from local tests on Flink) because it is mandatory that you do not loose data in case of your Cassandra cluster failure. Waiting for a single replica for write acknowledge is the minimum level for this guarantee. Best Etienne !Capture d’écran de 2022-04-14 16-34-59.png! > Flink Cassandra connector performance issue > -------------------------------------------- > > Key: FLINK-26793 > URL: https://issues.apache.org/jira/browse/FLINK-26793 > Project: Flink > Issue Type: Improvement > Components: Connectors / Cassandra > Affects Versions: 1.14.4 > Reporter: Jay Ghiya > Assignee: Etienne Chauchot > Priority: Major > Attachments: Capture d’écran de 2022-04-14 16-34-59.png > > > A warning is observed during long runs of flink job stating “Insertions into > scylla might be suffering. Expect performance problems unless this is > resolved.” > Upon initial analysis - “flink cassandra connector is not keeping instance of > mapping manager that is used to convert a pojo to cassandra row. Ideally the > mapping manager should have the same life time as cluster and session objects > which are also created once when the driver is initialized” > Reference: > https://stackoverflow.com/questions/59203418/cassandra-java-driver-warning -- This message was sent by Atlassian Jira (v8.20.1#820001)