Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/1771
  
    I've reviewed the connector again.
    The issues I've seen previously (failure on restart) are resolved.
    However, I found new issues:
    - The Cassandra Sink doesn't fail (at least not within 15 minutes) if 
Cassandra is not available anymore. Its probably just a configuration setting 
of the cassandra driver to fail after a certain amount of time.
    - We should probably introduce a (configurable) limit (nr. records / some 
gb's) for the write ahead log. It seemed to me, that due to the failed other 
instance, no checkpoints were able to complete anymore (because some of the 
cassandra sinks were stuck in the notifyCheckpointComplete()), while other's 
were accepting data into the WAL. This lead to a lot of data being written into 
the statebackend. I think the cassandra sink should stop at some point in such 
a situation.
    
    Also, I would like to test the exactly once behavior on a cluster more 
thoroughly. Currently, I've only tested whether the connector is properly 
failing and restoring, but I didn't test if the written data is actually 
correct.
    
    However, since the code seems to be working under normal operation, I would 
suggest to merge the connector now, and then file follow up JIRAs for the 
remaining issues.
    This makes collaboration and reviews easier and allows our users to help 
testing the cassandra connector.
    
    
    
    Some log:
    ```
    2016-06-03 12:28:36,478 ERROR 
org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink  - Error 
while sending value.
    com.datastax.driver.core.exceptions.UnavailableException: Not enough 
replicas available for query at consistency LOCAL_ONE (1 required but only 0 
alive)
        at 
com.datastax.driver.core.exceptions.UnavailableException.copy(UnavailableException.java:128)
        at 
com.datastax.driver.core.Responses$Error.asException(Responses.java:114)
        at 
com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:477)
        at 
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
        at 
com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
        at 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: com.datastax.driver.core.exceptions.UnavailableException: Not 
enough replicas available for query at consistency LOCAL_ONE (1 required but 
only 0 alive)
        at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:50)
        at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:37)
        at 
com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:266)
        at 
com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:246)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
        ... 11 more
    2016-06-03 12:28:57,473 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 1000 milliseconds
    2016-06-03 12:28:57,487 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 1000 milliseconds
    2016-06-03 12:29:02,939 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 2000 milliseconds
    2016-06-03 12:29:02,970 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 2000 milliseconds
    2016-06-03 12:29:12,945 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 4000 milliseconds
    2016-06-03 12:29:12,974 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 4000 milliseconds
    2016-06-03 12:29:17,947 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 8000 milliseconds
    2016-06-03 12:29:17,977 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 8000 milliseconds
    2016-06-03 12:29:28,481 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 16000 milliseconds
    2016-06-03 12:29:28,974 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 16000 milliseconds
    2016-06-03 12:29:44,482 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 32000 milliseconds
    2016-06-03 12:29:44,975 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 32000 milliseconds
    2016-06-03 12:30:16,482 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 64000 milliseconds
    2016-06-03 12:30:16,975 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 64000 milliseconds
    2016-06-03 12:31:20,483 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 128000 milliseconds
    2016-06-03 12:31:20,976 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 128000 milliseconds
    2016-06-03 12:33:28,484 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 256000 milliseconds
    2016-06-03 12:33:28,976 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 256000 milliseconds
    2016-06-03 12:37:44,484 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 512000 milliseconds
    2016-06-03 12:37:44,977 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 512000 milliseconds
    2016-06-03 12:46:16,485 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 600000 milliseconds
    2016-06-03 12:46:16,977 ERROR com.datastax.driver.core.ControlConnection    
                - [Control connection] Cannot connect to any host, scheduling 
retry in 600000 milliseconds
    2016-06-03 12:46:54,906 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Attempting to cancel task Source: Custom Source (1/2)
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Source: Custom Source (1/2) switched to CANCELING
    2016-06-03 12:46:54,907 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Triggering cancellation of task code Source: Custom Source 
(1/2) (dec8d24e486ca9937739b7c6e07fbb05).
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Attempting to cancel task Cassandra Sink (1/2)
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Cassandra Sink (1/2) switched to CANCELING
    2016-06-03 12:46:54,909 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Triggering cancellation of task code Cassandra Sink (1/2) 
(96511ef6293a893b0ef35dd211aea2b9).
    2016-06-03 12:46:55,389 INFO  com.dataartisans.Job                          
                - Received cancel in EventGenerator
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Source: Custom Source (1/2) switched to CANCELED
    2016-06-03 12:46:55,392 INFO  org.apache.flink.runtime.taskmanager.Task     
                - Freeing task resources for Source: Custom Source (1/2)
    2016-06-03 12:46:55,394 INFO  org.apache.flink.yarn.YarnTaskManager         
                - Un-registering task and sending final execution state 
CANCELED to JobManager for task Source: Custom Source 
(dec8d24e486ca9937739b7c6e07fbb05)
    2016-06-03 12:47:24,911 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:47:54,912 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:24,913 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:48:54,915 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:24,916 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:49:54,918 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    
    2016-06-03 12:50:24,919 WARN  org.apache.flink.runtime.taskmanager.Task     
                - Task 'Cassandra Sink (1/2)' did not react to cancelling 
signal, but is stuck in method:
     
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
    
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
    
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
    java.lang.Thread.run(Thread.java:745)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to