[ 
https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800520#comment-17800520
 ] 

Greg Harris commented on KAFKA-16047:
-------------------------------------

It appears that the transaction timeout is used as the timeout to produce 
records to the __transaction_state topic in 
TransactionStateManager#appendTransactionToLog 
[https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L769-L770]
which is in turn used during init, end, and add-partitions in 
TransactionCoordinator: 
[https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L199]
 

I see that after expiration, 
TransactionStateManager#writeTombstonesForExpiredTransactionalIds uses the 
request.timeout.ms: 
[https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala#L283-L284]
 

Should the TransactionStateManager be using the transaction timeout ms for a 
single-operation, or should the request timeout be used throughout? Using the 
transaction timeout is certainly shorter than we expect, but I wonder if that's 
also longer than people expect in other situations. When the producer makes 
these requests, it only waits for max.block.ms (default 60s) for them to 
complete. After that point, the producer times out the request, while the 
broker may be left waiting for the transaction timeout (default 60s) to expire.

* We can fix this on the broker side by changing the produce timeout to the 
value of "max(transaction timeout, request timeout)". Someone may have 
increased their max.block.ms & transaction timeout while keeping the request 
timeout short, and would still desire that the init call block for up to 
max.block.ms.
* We can fix this on the client side by changing the 1ms hardcoded value to use 
the same timeout as the overall fenceProducers request, which is either the 
default request timeout (configurable) or specified via the Options argument.

> Source connector with EOS enabled have some InitProducerId requests timing 
> out, effectively failing all the tasks & the whole connector
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16047
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16047
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect, mirrormaker
>    Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, 
> 3.6.1
>            Reporter: Angelos Kaltsikis
>            Priority: Major
>
> Source Connectors with 'exactly.once.support = required' may have some of 
> their tasks that issue InitProducerId requests from the admin client timeout. 
> In the case of MirrorSourceConnector, which was the source connector that i 
> found the bug, the bug was effectively making all the tasks (in the specific 
> case of) become "FAILED". As soon as one of the tasks gets FAILED due to the 
> 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many 
> restarts i did to the connector/tasks, i couldn't get the 
> MirrorSourceConnector in a healthy RUNNING state again.
> Due to the low timeout that has been [hard-coded in the 
> code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87]
>  (1ms), there is a chance that the `InitProducerId` requests timeout in case 
> of "slower-than-expected" Kafka brokers (that do not process & respond to the 
> above request in <= 1ms). (feel free to read more information about the issue 
> in the "More Context" section below)
> [~ChrisEgerton] I would appreciate it if you could respond to the following 
> questions
> - How and why was the 1ms magic number for transaction timeout has to be 
> chosen?
> - Is there any specific reason that it can be guaranteed that the 
> `InitProducerId` request can be processed in such a small time window? 
> - I have tried the above in multiple different Kafka clusters that are hosted 
> in different underlying datacenter hosts and i don't believe that those 
> brokers are "slow" for some reason. If you feel that the brokers are slower 
> than expected, i would appreciate any pointers on how could i find out what 
> is the bottleneck
> h3. Temporary Mitigation
> I have increased the timeout to 1000ms (randomly picked this number, just 
> wanted to give enough time to brokers to always complete those type of 
> requests). It fix can be found in my fork 
> https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f
>  
> h3. Final solution
> The temporary mitigation is not ideal, as it still randomly picks a timeout 
> for such an operation which may high enough but it's not ensured that it will 
> always be high enough. Shall we introduce something client configurable ?
> At the same time, i was thinking whether it makes sense to introduce some 
> tests that simulate slower than the "blazing" fast mocked brokers that exist 
> in Unit Tests, so as to be able to catch this type of low timeouts that 
> potentially make some software features not usable.
> h3. What is affected
> The above bug exists in MirrorSourceConnector Tasks running in distributed 
> Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode 
> enabled (pre-requisite for the exactly.once.support to work). I believe this 
> should be true for other SourceConnectors as well (as the code-path that was 
> the one to blame is Connect specific & not MirrorMaker specific).
> h3. More context & logs
> *Connector Logs*
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: fenceProducer(api=INIT_PRODUCER_ID)
> {code}
> *Broker Logs*
> {code:java}
> [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning 
> COORDINATOR_NOT_AVAILABLE error code to client for 
> kafka-connect-uat-mm2-msc-20th-7's InitProducerId request 
> (kafka.coordinator.transaction.TransactionCoordinator)
> [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: 
> TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for 
> TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, 
> lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), 
> txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed 
> due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), 
> aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the 
> callback (kafka.coordinator.transaction.TransactionStateManager)
> {code}
> h3. How to reproduce it
> While the bug exists in both the Standalone MM2 deployment, it's easier to 
> reproduce it via deploying the connector to a Kafka Connect cluster (as it is 
> possible to update the config/delete/restart/pause/stop/resume via the Kafka 
> Connect REST API)
> Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with 
> `exactly.once.source.support = enabled`) and after the initial start, update 
> it's configuration or restart the connector & tasks. 
> To test whether my fork has fixed the issue once and for good i have created 
> the following script, which constantly restarts the connector every few 
> seconds (after it's tasks get in RUNNING state). I have been running the 
> scripts for a few hours and the MirrorSourceConnector never got in a state 
> that was non recoverable (as it was happening on the upstream versions)
> {code:java}
> #!/bin/bash
> # Source vars
> source /<path>/connect.sh
> # Kafka Connect API endpoint
> KAFKA_CONNECT_API=$KAFKA_CONNECT_URL
> # Kafka Connect connector name
> CONNECTOR_NAME="<connector_name>"
> while true; do
>     # Fetch the connector status
>     connector_status=$(curl -k -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s 
> "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status")
>     # Check if connector is in FAILED state
>     if echo "$connector_status" | grep -q '"state":"FAILED"'; then
>         echo "Connector has failed. Exiting."
>         exit 1
>     fi
>     # Fetch and check all task statuses
>     task_statuses=$(echo "$connector_status" | jq '.tasks[].state')
>     all_running=true
>     for status in $task_statuses; do
>         if [ "$status" != '"RUNNING"' ]; then
>             all_running=false
>             break
>         fi
>     done
>     # If all tasks and the connector are RUNNING, restart them after 90 
> seconds
>     if $all_running; then
>         echo "All tasks are running. Restarting in 90 seconds."
>         sleep 90
>         date;curl -k -X POST -H "Content-Type: application/json" -u 
> $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD 
> $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true
>     else
>         echo "Not all tasks are running. Checking again..."
>     fi
>     # Sleep for a while before checking again
>     sleep 10
> done
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to