[ 
https://issues.apache.org/jira/browse/KAFKA-16603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16603.
-----------------------------------
    Resolution: Not A Bug

> Data loss when kafka connect sending data to Kafka
> --------------------------------------------------
>
>                 Key: KAFKA-16603
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16603
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 3.3.1
>            Reporter: Anil Dasari
>            Priority: Major
>
> We are experiencing a data loss when Kafka Source connector is failed to send 
> data to Kafka topic and offset topic. 
> Kafka cluster and Kafka connect details:
>  # Kafka connect version i.e client : Confluent community version 7.3.1 i.e 
> Kafka 3.3.1
>  # Kafka version: 0.11.0 (server)
>  # Cluster size : 3 brokers
>  # Number of partitions in all topics = 3
>  # Replication factor = 3
>  # Min ISR set 2
>  # Uses no transformations in Kafka connector
>  # Use default error tolerance i.e None.
> Our connector checkpoints the offsets info received in 
> SourceTask#commitRecord and resume the data process from the persisted 
> checkpoint.
> The data loss is noticed when broker is unresponsive for few mins due to high 
> load and kafka connector was restarted. Also, Kafka connector graceful 
> shutdown failed.
> Logs:
>  
> {code:java}
> [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.152 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: false. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:16.153 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Requesting disconnect from 
> last known coordinator 10.75.100.176:31000 (id: 2147483647 rack: null)
> Apr 22, 2024 @ 15:56:16.514 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 0 disconnected.
> Apr 22, 2024 @ 15:56:16.708 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Node 0 
> disconnected.
> Apr 22, 2024 @ 15:56:16.710 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Node 2147483647 
> disconnected.
> Apr 22, 2024 @ 15:56:16.731 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Group coordinator 
> 10.75.100.176:31000 (id: 2147483647 rack: null) is unavailable or invalid due 
> to cause: coordinator unavailable. isDisconnected: true. Rediscovery will be 
> attempted.
> Apr 22, 2024 @ 15:56:19.103 == Trying to sleep while stop == (** custom log 
> **)
> Apr 22, 2024 @ 15:56:19.755 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Broker coordinator was 
> unreachable for 3000ms. Revoking previous assignment Assignment{error=0, 
> leader='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> leaderUrl='http://10.75.100.46:8083/', offset=4, 
> connectorIds=[d094a5d7bbb046b99d62398cb84d648c], 
> taskIds=[d094a5d7bbb046b99d62398cb84d648c-0], revokedConnectorIds=[], 
> revokedTaskIds=[], delay=0} to avoid running tasks while not being a member 
> the group
> Apr 22, 2024 @ 15:56:19.866 Stopping connector 
> d094a5d7bbb046b99d62398cb84d648c
> Apr 22, 2024 @ 15:56:19.874 Stopping task d094a5d7bbb046b99d62398cb84d648c-0
> Apr 22, 2024 @ 15:56:19.880 Scheduled shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:24.105 Connector 'd094a5d7bbb046b99d62398cb84d648c' 
> failed to properly shut down, has become unresponsive, and may be consuming 
> external resources. Correct the configuration for this connector or remove 
> the connector. After fixing the connector, it may be necessary to restart 
> this worker to release any consumed resources.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Closing the 
> Kafka producer with timeoutMillis = 0 ms.
> Apr 22, 2024 @ 15:56:24.110 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Proceeding to 
> force close the producer since pending requests could not be completed within 
> timeout 0 ms.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Beginning 
> shutdown of Kafka producer I/O thread, sending remaining records.
> Apr 22, 2024 @ 15:56:24.112 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Aborting 
> incomplete batches due to forced shutdown
> Apr 22, 2024 @ 15:56:24.113 
> WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
> Committing offsets
> Apr 22, 2024 @ 15:56:24.113 
> WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
> Either no records were produced by the task since the last offset commit, or 
> every record has been filtered out by a transformation or dropped due to 
> transformation or conversion errors.
> Apr 22, 2024 @ 15:56:24.146 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished stopping tasks in 
> preparation for rebalance
> Apr 22, 2024 @ 15:56:24.165 
> WorkerSourceTaskWorkerSourceTask{id=d094a5d7bbb046b99d62398cb84d648c-0} 
> Finished commitOffsets successfully in 52 ms
> Apr 22, 2024 @ 15:56:24.165 Committing offset '{server=fewtables}' for 
> partition '{transaction_id=null, lsn_proc=199129206664, 
> lsn_commit=199129206312, lsn=199129206664, txId=170008064, 
> ts_usec=1713826574699027}'
> Apr 22, 2024 @ 15:56:24.165 Flushing LSN to server: LSNLSN{2E/5D068E28}
> Apr 22, 2024 @ 15:56:24.166 == BASE Trying to sleep while stop == (*** custom 
> logs to fail the connector graceful shutdown ***)
> Apr 22, 2024 @ 15:56:24.167 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Shutdown of 
> Kafka producer I/O thread has completed.
> Apr 22, 2024 @ 15:56:24.172 Metrics scheduler closed
> Apr 22, 2024 @ 15:56:24.173 Closing reporter 
> org.apache.kafka.common.metrics.JmxReporter
> Apr 22, 2024 @ 15:56:24.173 Metrics reporters closed
> Apr 22, 2024 @ 15:56:24.175 App info kafka.producer for 
> connector-producer-d094a5d7bbb046b99d62398cb84d648c-0 unregistered
> Apr 22, 2024 @ 15:56:24.175 [Producer 
> clientId=connector-producer-d094a5d7bbb046b99d62398cb84d648c-0] Kafka 
> producer has been closed
> Apr 22, 2024 @ 15:56:24.944 Graceful stop of task 
> d094a5d7bbb046b99d62398cb84d648c-0 failed.
> Apr 22, 2024 @ 15:56:29.108 == Got out of sleep while stop == (*** custom 
> logs to fail the connector graceful shutdown ***)
> Apr 22, 2024 @ 15:56:29.109 Completed shutdown for 
> WorkerConnectorWorkerConnector{id=d094a5d7bbb046b99d62398cb84d648c}
> Apr 22, 2024 @ 15:56:29.854 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Finished flushing status 
> backing store in preparation for rebalance
> Apr 22, 2024 @ 15:56:29.862 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.102.132:31000 (id: 2147483645 rack: null)
> Apr 22, 2024 @ 15:56:29.863 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Discovered group 
> coordinator 10.75.102.132:31000 (id: 2147483645 rack: null)
> Apr 22, 2024 @ 15:56:29.863 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Rebalance started
> Apr 22, 2024 @ 15:56:29.863 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] (Re-)joining group
> Apr 22, 2024 @ 15:56:29.869 [Worker clientId=connect-1, 
> groupId=pg-group-adf06ea08abb4394ad4f2787481fee17] Successfully joined group 
> with generation Generation{generationId=2, 
> memberId='connect-1-8f41a1d2-6cc9-4956-9be3-1fbae9c6d305', 
> protocol='sessioned'} {code}
> There was no indication of Leader change for partitions/network partition in 
> Kafka cluster during this period. 
>  
> According to 
> [this|https://github.com/apache/kafka/blob/b254e787cbdefb38344c6a0da2b965e6d7707d27/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L411]
>  code, {{SourceTask#commitRecord}} is triggered only after a successful Kafka 
> producer send.
> Is there a scenario where the Kafka producer invokes a callback but fails to 
> publish the message to the broker?
> Let me know if you have any questions. thanks.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to