[ 
https://issues.apache.org/jira/browse/FLINK-26793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522355#comment-17522355
 ] 

Etienne Chauchot commented on FLINK-26793:
------------------------------------------

[~bumblebee] I reproduced the behavior you observed: I ran [this streaming 
infinite 
pipeline|https://github.com/echauchot/flink-samples/blob/master/src/main/java/org/example/CassandraPojoSinkStreamingExample.java]
 for 3 days on a local flink 1.14.4 cluster + cassandra 3.0 docker (I did not 
have rights to instanciate a cassandra cluster on Azure). The pipeline has 
checkpointing configured every 10 min with exactly once semantics and no 
watermark defined. It was run at parallelism 16 which corresponds to the number 
of cores on my laptop. I created a source that gives pojos every 100 ms.  The 
source is mono-threaded so at parallelism 1.  See all the screenshots

I ran the pipeline for more than 72 hours and indeed after little less than 
72h, I got an exception from Cassandra cluster see task manager log:

{code:java}
2022-04-13 16:38:15,227 ERROR 
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink [] - Error 
while sending value.
com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
during write query at consistency LOCAL_ONE (1 replica were required but only 0 
acknowledged the write)
{code}
This exception means that Cassandra coordinator node (internal Cassandra) 
waited too long for an internal replication (raplication to another node in the 
same casssandra "datacenter") and did not ack the write. 

This led to a failure of the write task and to a restoration of the job from 
the last checkpoint see job manager log:

{code:java}
2022-04-13 16:38:20,847 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Cassandra 
Pojo Sink Streaming example (dc7522bc1855f6f98038ac2b4eed4095) switched from 
state RESTARTING to RUNNING.
2022-04-13 16:38:20,850 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
dc7522bc1855f6f98038ac2b4eed4095 from Checkpoint 136 @ 1649858983772 for 
dc7522bc1855f6f98038ac2b4eed4095 located at 
file:/tmp/flink-checkpoints/dc7522bc1855f6f98038ac2b4eed4095/chk-136.
{code}

Obviously, this restoration led to the re-creation of the _CassandraPojoSink_ 
and to the re-creation of the related _MappingManager_

So in short, this is exactly what I supposed in my previous comments. Restoring 
from checkpoints  slow down you writes (job restart time +  cassandra driver 
state re-creation - connection,  prepared statements etc... -)

The problem is that the timeout comes from Cassandra itself not from Flink and 
it is normal that Flink restores the job in such circumstances. 
What you can do is to increase the Cassandra write timeout to your workload in 
your Cassandra cluster so that such errors do not happen. For that you need to 
raise _write_request_timeout_in_ms_  conf parameter in your _cassandra.yml_.

I do not recommend that you lower the replication factor in your Cassandra 
cluster (I did that only from local tests on Flink) because it is mandatory 
that you do not loose data in case of your Cassandra cluster failure. Waiting 
for a single replica for write acknowledge is the minimum level for this 
guarantee.

Best
Etienne

  !Capture d’écran de 2022-04-14 16-34-59.png! 

> Flink Cassandra connector performance issue 
> --------------------------------------------
>
>                 Key: FLINK-26793
>                 URL: https://issues.apache.org/jira/browse/FLINK-26793
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Cassandra
>    Affects Versions: 1.14.4
>            Reporter: Jay Ghiya
>            Assignee: Etienne Chauchot
>            Priority: Major
>         Attachments: Capture d’écran de 2022-04-14 16-34-59.png
>
>
> A warning is observed during long runs of flink job stating “Insertions into 
> scylla might be suffering. Expect performance problems unless this is 
> resolved.”
> Upon initial analysis - “flink cassandra connector is not keeping instance of 
> mapping manager that is used to convert a pojo to cassandra row. Ideally the 
> mapping manager should have the same life time as cluster and session objects 
> which are also created once when the driver is initialized”
> Reference: 
> https://stackoverflow.com/questions/59203418/cassandra-java-driver-warning



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to