Angelos Kaltsikis created KAFKA-16047:
-----------------------------------------

             Summary: Source connector with EOS enabled have some 
InitProducerId requests timing out
                 Key: KAFKA-16047
                 URL: https://issues.apache.org/jira/browse/KAFKA-16047
             Project: Kafka
          Issue Type: Bug
          Components: connect, mirrormaker
    Affects Versions: 3.6.1, 3.5.2, 3.5.1, 3.6.0, 3.4.1, 3.3.2, 3.3.1, 3.4.0, 
3.3.0
            Reporter: Angelos Kaltsikis


Source Connectors with 'exactly.once.support = required' may have some of their 
tasks that issue InitProducerId requests timeout. In the case of 
MirrorSourceConnector, which was the source connector that i found the bug, the 
bug was effectively making all the tasks (in the specific case of) become 
"FAILED". As soon as one of the tasks get in the FAILED state due to the 
'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
restarts i did to the connector/tasks, i couldn't get the MirrorSourceConnector 
in a healthy RUNNING state again.

Due to the low timeout that has been [hard-coded in the 
code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
 (1ms), there is a chance that the `InitProducerId` requests timeout in case of 
"slower-than-expected" Kafka brokers (that do not process & respond to the 
above request in <= 1ms). (feel free to read more information about the issue 
in the "More Context" section below)

[~ChrisEgerton] I would appreciate it if you could respond to the following 
questions
- How and why was the 1ms magic number for transaction timeout has to be chosen?
- Is there any specific reason that it can be guaranteed that the 
`InitProducerId` request can be processed in such a small time window? 
- I have tried the above in multiple different Kafka clusters that are hosted 
in different underlying datacenter hosts and i don't believe that those brokers 
are "slow" for some reason. If you feel that the brokers are slower than 
expected, i would appreciate any pointers on how could i find out what is the 
bottleneck


h3. Temporary Mitigation

I have increased the timeout to 1000ms (randomly picked this number, just 
wanted to give enough time to brokers to always complete those type of 
requests). It fix can be found in my fork 
https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
 

h3. Final solution

The temporary mitigation is not ideal, as it still randomly picks a timeout for 
such an operation which may high enough but it's not ensured that it will 
always be high enough. Shall we introduce something client configurable ?
At the same time, i was thinking whether it makes sense to introduce some tests 
that simulate slower than the "blazing" fast mocked brokers that exist in Unit 
Tests, so as to be able to catch this type of low timeouts that potentially 
make some software features not usable.


h3. What is affected

The above bug exists in MirrorSourceConnector Tasks running in distributed 
Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
enabled (pre-requisite for the exactly.once.support to work). I believe this 
should be true for other SourceConnectors as well (as the code-path that was 
the one to blame is Connect specific & not MirrorMaker specific).

h3. More context & logs

*Connector Logs*
{code:java}
Caused by: java.util.concurrent.CompletionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
{code}

*Broker Logs*
{code:java}
[2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning 
COORDINATOR_NOT_AVAILABLE error code to client for 
kafka-connect-uat-mm2-msc-20th-7's InitProducerId request 
(kafka.coordinator.transaction.TransactionCoordinator)


[2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: 
TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for 
TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, 
lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), 
txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed 
due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), 
aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
callback (kafka.coordinator.transaction.TransactionStateManager)
{code}


h3. How to reproduce it

While the bug exists in both the Standalone MM2 deployment, it's easier to 
reproduce it via deploying the connector to a Kafka Connect cluster (as it is 
possible to update the config/delete/restart/pause/stop/resume via the Kafka 
Connect REST API)
Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with 
`exactly.once.source.support = enabled`) and after the initial start, update 
it's configuration or restart the connector & tasks. 

To test whether my fork has fixed the issue once and for good i have created 
the following script, which constantly restarts the connector every few seconds 
(after it's tasks get in RUNNING state). I have been running the scripts for a 
few hours and the MirrorSourceConnector never got in a state that was non 
recoverable (as it was happening on the upstream versions)

{code:java}
#!/bin/bash

# Source vars
source /<path>/connect.sh
# Kafka Connect API endpoint
KAFKA_CONNECT_API=$KAFKA_CONNECT_URL

# Kafka Connect connector name
CONNECTOR_NAME="<connector_name>"

while true; do
    # Fetch the connector status
    connector_status=$(curl -k -u 
$KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s 
"$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")

    # Check if connector is in FAILED state
    if echo "$connector_status" | grep -q '"state":"FAILED"'; then
        echo "Connector has failed. Exiting."
        exit 1
    fi

    # Fetch and check all task statuses
    task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
    all_running=true
    for status in $task_statuses; do
        if [ "$status" != '"RUNNING"' ]; then
            all_running=false
            break
        fi
    done

    # If all tasks and the connector are RUNNING, restart them after 90 seconds
    if $all_running; then
        echo "All tasks are running. Restarting in 90 seconds."
        sleep 90
        date;curl -k -X POST -H "Content-Type: application/json" -u 
$KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD 
$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
    else
        echo "Not all tasks are running. Checking again..."
    fi

    # Sleep for a while before checking again
    sleep 10
done
{code}









--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to