[ https://issues.apache.org/jira/browse/KAFKA-16047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851019#comment-17851019 ]
Edoardo Comar commented on KAFKA-16047: --------------------------------------- [~gharris1727] please see [https://github.com/apache/kafka/pull/16151] This works IMHO as a hotfix that requires no KIP. Arguably all appends to the transaction log could use this timeout, rather than the transaction timeout which has a completely different semantic, tied to the transaction not to the appending of a record and its replication. However I think that such a change requires a KIP and may have a much larger impact than just fixing Admin.fenceProducers and distributed Connect > Source connector with EOS enabled have some InitProducerId requests timing > out, effectively failing all the tasks & the whole connector > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-16047 > URL: https://issues.apache.org/jira/browse/KAFKA-16047 > Project: Kafka > Issue Type: Bug > Components: connect, mirrormaker > Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.4.1, 3.6.0, 3.5.1, 3.5.2, > 3.6.1 > Reporter: Angelos Kaltsikis > Assignee: Edoardo Comar > Priority: Major > > Source Connectors with 'exactly.once.support = required' may have some of > their tasks that issue InitProducerId requests from the admin client timeout. > In the case of MirrorSourceConnector, which was the source connector that i > found the bug, the bug was effectively making all the tasks (in the specific > case of) become "FAILED". As soon as one of the tasks gets FAILED due to the > 'COORDINATOR_NOT_AVAILABLE' messages (due to timeouts), no matter how many > restarts i did to the connector/tasks, i couldn't get the > MirrorSourceConnector in a healthy RUNNING state again. > Due to the low timeout that has been [hard-coded in the > code|https://github.com/apache/kafka/blob/3.6.1/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java#L87] > (1ms), there is a chance that the `InitProducerId` requests timeout in case > of "slower-than-expected" Kafka brokers (that do not process & respond to the > above request in <= 1ms). (feel free to read more information about the issue > in the "More Context" section below) > [~ChrisEgerton] I would appreciate it if you could respond to the following > questions > - How and why was the 1ms magic number for transaction timeout has to be > chosen? > - Is there any specific reason that it can be guaranteed that the > `InitProducerId` request can be processed in such a small time window? > - I have tried the above in multiple different Kafka clusters that are hosted > in different underlying datacenter hosts and i don't believe that those > brokers are "slow" for some reason. If you feel that the brokers are slower > than expected, i would appreciate any pointers on how could i find out what > is the bottleneck > h3. Temporary Mitigation > I have increased the timeout to 1000ms (randomly picked this number, just > wanted to give enough time to brokers to always complete those type of > requests). It fix can be found in my fork > https://github.com/akaltsikis/kafka/commit/8a47992e7dc63954f9d9ac54e8ed1f5a7737c97f > > h3. Final solution > The temporary mitigation is not ideal, as it still randomly picks a timeout > for such an operation which may high enough but it's not ensured that it will > always be high enough. Shall we introduce something client configurable ? > At the same time, i was thinking whether it makes sense to introduce some > tests that simulate slower than the "blazing" fast mocked brokers that exist > in Unit Tests, so as to be able to catch this type of low timeouts that > potentially make some software features not usable. > h3. What is affected > The above bug exists in MirrorSourceConnector Tasks running in distributed > Kafka connect cluster or MIrrorMaker 2 jobs that run with distributed mode > enabled (pre-requisite for the exactly.once.support to work). I believe this > should be true for other SourceConnectors as well (as the code-path that was > the one to blame is Connect specific & not MirrorMaker specific). > h3. More context & logs > *Connector Logs* > {code:java} > Caused by: java.util.concurrent.CompletionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: fenceProducer(api=INIT_PRODUCER_ID) > {code} > *Broker Logs* > {code:java} > [2023-12-12 14:28:18,030] INFO [TransactionCoordinator id=<id>] Returning > COORDINATOR_NOT_AVAILABLE error code to client for > kafka-connect-uat-mm2-msc-20th-7's InitProducerId request > (kafka.coordinator.transaction.TransactionCoordinator) > [2023-12-12 14:28:18,030] INFO [Transaction State Manager 1001]: > TransactionalId kafka-connect-uat-mm2-msc-20th-7 append transaction log for > TxnTransitMetadata(producerId=61137, lastProducerId=61137, producerEpoch=2, > lastProducerEpoch=-1, txnTimeoutMs=1, txnState=Empty, topicPartitions=Set(), > txnStartTimestamp=-1, txnLastUpdateTimestamp=1702391298028) transition failed > due to COORDINATOR_NOT_AVAILABLE, resetting pending state from Some(Empty), > aborting state transition and returning COORDINATOR_NOT_AVAILABLE in the > callback (kafka.coordinator.transaction.TransactionStateManager) > {code} > h3. How to reproduce it > While the bug exists in both the Standalone MM2 deployment, it's easier to > reproduce it via deploying the connector to a Kafka Connect cluster (as it is > possible to update the config/delete/restart/pause/stop/resume via the Kafka > Connect REST API) > Thus, Deploy a MirrorSourceConnector on a Kafka connect cluster (with > `exactly.once.source.support = enabled`) and after the initial start, update > it's configuration or restart the connector & tasks. > To test whether my fork has fixed the issue once and for good i have created > the following script, which constantly restarts the connector every few > seconds (after it's tasks get in RUNNING state). I have been running the > scripts for a few hours and the MirrorSourceConnector never got in a state > that was non recoverable (as it was happening on the upstream versions) > {code:java} > #!/bin/bash > # Source vars > source /<path>/connect.sh > # Kafka Connect API endpoint > KAFKA_CONNECT_API=$KAFKA_CONNECT_URL > # Kafka Connect connector name > CONNECTOR_NAME="<connector_name>" > while true; do > # Fetch the connector status > connector_status=$(curl -k -u > $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD -s > "$KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/status") > # Check if connector is in FAILED state > if echo "$connector_status" | grep -q '"state":"FAILED"'; then > echo "Connector has failed. Exiting." > exit 1 > fi > # Fetch and check all task statuses > task_statuses=$(echo "$connector_status" | jq '.tasks[].state') > all_running=true > for status in $task_statuses; do > if [ "$status" != '"RUNNING"' ]; then > all_running=false > break > fi > done > # If all tasks and the connector are RUNNING, restart them after 90 > seconds > if $all_running; then > echo "All tasks are running. Restarting in 90 seconds." > sleep 90 > date;curl -k -X POST -H "Content-Type: application/json" -u > $KAFKA_CONNECT_BASIC_AUTH_USERNAME:$KAFKA_CONNECT_BASIC_AUTH_PASSWORD > $KAFKA_CONNECT_API/connectors/$CONNECTOR_NAME/restart\?includeTasks=true > else > echo "Not all tasks are running. Checking again..." > fi > # Sleep for a while before checking again > sleep 10 > done > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)