[ 
https://issues.apache.org/jira/browse/IGNITE-19459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724257#comment-17724257
 ] 

Vedran Ljubovic commented on IGNITE-19459:
------------------------------------------

After some experiments, I have found that simply moving extractor 
initialization before others fixes the problem. Apparently 
StreamerContext.getStreamer() can sometimes take a few seconds, kc starts 
streaming messages before it is done and these messages will be dropped because 
the key is null. Patch is included.

> Kafka Connect IgniteSinkConnector drops messages in case of error
> -----------------------------------------------------------------
>
>                 Key: IGNITE-19459
>                 URL: https://issues.apache.org/jira/browse/IGNITE-19459
>             Project: Ignite
>          Issue Type: Bug
>          Components: extensions
>    Affects Versions: 2.15
>            Reporter: Vedran Ljubovic
>            Priority: Major
>         Attachments: event_dropping.patch
>
>
> We are using Kafka Connect (KC) to stream messages from Kafka to Ignite. 
> Since the Kafka topic is using null key, we have created a custom 
> SingleTupleExtractor to generate keys from payload. This works very well when 
> everything is ok. However, if there are any kind of issues with starting a 
> cache on Ignite (such as if cluster state is inactive or if cache has 
> lostParts), we expect KC to fail to start. Instead, KC will start and appear 
> to be running, and the messages will be dropped - which means that once the 
> problems are removed, KC will not attempt to resend the messages even after 
> restart! This for us is unacceptable, we believe that the system should be 
> reliable and fault-tolerant.
> In logs we notice errors such as:
> {code:java}
> Failed to stream a record with null key! {code}
> which is useless since we do have a SingleTupleExtractor for this purpose and 
> we can see that it isn't being called at all!
> When KC REST API [1] is used, we find the state is RUNNING which means that 
> we have no way to detect this error other than parsing the logs which is 
> unreliable.
> Upon investigating this issue, we found the following:
>  * Ignite connection and IgniteDataStreamer are declared as private static 
> final fields of an inner class, they will be initialized when calling the 
> start() method of IgniteSinkConnector. From KC docs [2], we conclude that 
> method initialize() should be overloaded and the connections created there, 
> also that appropriate exception types should be thrown so that KC knows that 
> connection has failed and terminate the task/connector.
>  * When start() method is called, StreamerContext.getStreamer() in line 72 
> will fail with exception. This exception is not handled by KC so it doesn't 
> know that task failed to start. In addition, code will never reach line 91 
> where SingleTupleExtractor is created therefore there will be no extractor. 
> Solution would be to catch all types of exceptions and throw those exceptions 
> which will be detected by KC as critical errors. Alternatively, put() method 
> should throw an exception is stopped is true.
>  * When put() method is called, if there is no key and no extractor, in line 
> 121 we see that the error is logged but exception is not thrown so KC thinks 
> that the message was successfully streamed. Here, ConnectException should be 
> thrown. If users want the current behavior (which is to stream Kafka messages 
> with key and skip those without key), they can set option errors.tolerance = 
> all in connector config. [3]
> [1] 
> [https://docs.confluent.io/platform/current/connect/references/restapi.html]
> [2] 
> [https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html]
> [3] 
> [https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to