[ 
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800557#comment-17800557
 ] 

Angelos Kaltsikis commented on KAFKA-16047:
-------------------------------------------

Yes sir. Will do so and ask for a review. Btw thanks for the extra context 👌🏽

> Source connector with EOS enabled have some InitProducerId requests timing 
> out, effectively failing all the tasks & the whole connector
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16047
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16047
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect, mirrormaker
>    Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.6.1
>            Reporter: Angelos Kaltsikis
>            Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of 
> their tasks that issue InitProducerId requests from the admin client timeout. 
> In the case of MirrorSourceConnector, which was the source connector that i 
> found the bug, the bug was effectively making all the tasks (in the specific 
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
> restarts i did to the connector/tasks, i couldn't get the 
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the 
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
>  (1ms), there is a chance that the `InitProducerId` requests timeout in case 
> of "slower-than-expected" Kafka brokers (that do not process & respond to the 
> above request in <= 1ms). (feel free to read more information about the issue 
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following 
> questions
> - How and why was the 1ms magic number for transaction timeout has to be 
> chosen?
> - Is there any specific reason that it can be guaranteed that the 
> `InitProducerId` request can be processed in such a small time window? 
> - I have tried the above in multiple different Kafka clusters that are hosted 
> in different underlying datacenter hosts and i don't believe that those 
> brokers are "slow" for some reason. If you feel that the brokers are slower 
> than expected, i would appreciate any pointers on how could i find out what 
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just 
> wanted to give enough time to brokers to always complete those type of 
> requests). It fix can be found in my fork 
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>  
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout 
> for such an operation which may high enough but it's not ensured that it will 
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some 
> tests that simulate slower than the "blazing" fast mocked brokers that exist 
> in Unit Tests, so as to be able to catch this type of low timeouts that 
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed 
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
> enabled (pre-requisite for the exactly.once.support to work). I believe this 
> should be true for other SourceConnectors as well (as the code-path that was 
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
> {code}
> *Broker Logs*
> {code:java}
> [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning 
> COORDINATOR_NOT_AVAILABLE error code to client for 
> kafka-connect-uat-mm2-msc-20th-7's InitProducerId request 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: 
> TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for 
> TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, 
> lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), 
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed 
> due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), 
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
> callback (kafka.coordinator.transaction.TransactionStateManager)
> {code}
> h3. How to reproduce it
> While the bug exists in both the Standalone MM2 deployment, it's easier to 
> reproduce it via deploying the connector to a Kafka Connect cluster (as it is 
> possible to update the config/delete/restart/pause/stop/resume via the Kafka 
> Connect REST API)
> Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with 
> `exactly.once.source.support = enabled`) and after the initial start, update 
> it's configuration or restart the connector & tasks. 
> To test whether my fork has fixed the issue once and for good i have created 
> the following script, which constantly restarts the connector every few 
> seconds (after it's tasks get in RUNNING state). I have been running the 
> scripts for a few hours and the MirrorSourceConnector never got in a state 
> that was non recoverable (as it was happening on the upstream versions)
> {code:java}
> #!/bin/bash
> # Source vars
> source /<path>/connect.sh
> # Kafka Connect API endpoint
> KAFKA_CONNECT_API=$KAFKA_CONNECT_URL
> # Kafka Connect connector name
> CONNECTOR_NAME="<connector_name>"
> while true; do
>     # Fetch the connector status
>     connector_status=$(curl -k -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s 
> "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")
>     # Check if connector is in FAILED state
>     if echo "$connector_status" | grep -q '"state":"FAILED"'; then
>         echo "Connector has failed. Exiting."
>         exit 1
>     fi
>     # Fetch and check all task statuses
>     task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
>     all_running=true
>     for status in $task_statuses; do
>         if [ "$status" != '"RUNNING"' ]; then
>             all_running=false
>             break
>         fi
>     done
>     # If all tasks and the connector are RUNNING, restart them after 90 
> seconds
>     if $all_running; then
>         echo "All tasks are running. Restarting in 90 seconds."
>         sleep 90
>         date;curl -k -X POST -H "Content-Type: application/json" -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD 
> $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
>     else
>         echo "Not all tasks are running. Checking again..."
>     fi
>     # Sleep for a while before checking again
>     sleep 10
> done
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to