chia7712 commented on code in PR #19879:
URL: https://github.com/apache/kafka/pull/19879#discussion_r2121989155


##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -91,4 +97,113 @@ public int numRecoveryThreadsPerDataDir() {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
+
+    public int brokerId() {
+        return getInt(ServerConfigs.BROKER_ID_CONFIG);
+    }
+
+    public int requestTimeoutMs() {
+        return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    public List<String> controllerListenerNames() {
+        return 
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+    }
+
+    public ListenerName interBrokerListenerName() {
+        return getInterBrokerListenerNameAndSecurityProtocol().getKey();
+    }
+
+    public SecurityProtocol interBrokerSecurityProtocol() {
+        return getInterBrokerListenerNameAndSecurityProtocol().getValue();
+    }
+
+    public Map<ListenerName, SecurityProtocol> 
effectiveListenerSecurityProtocolMap() {
+        Map<ListenerName, SecurityProtocol> mapValue =
+                
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+                        
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+                        .entrySet()
+                        .stream()
+                        .collect(Collectors.toMap(
+                                e -> ListenerName.normalised(e.getKey()),
+                                e -> getSecurityProtocol(
+                                        e.getValue(),
+                                        
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+        if 
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
+            // Using the default configuration since 
listener.security.protocol.map is not explicitly set.
+            // Before adding default PLAINTEXT mappings for controller 
listeners, verify that:
+            // 1. No SSL or SASL protocols are used in controller listeners
+            // 2. No SSL or SASL protocols are used in regular listeners 
(Note: controller listeners
+            //    are not included in 'listeners' config when 
process.roles=broker)
+            if 
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl) 
||
+                    
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+                            .anyMatch(listenerName -> 
isSslOrSasl(parseListenerName(listenerName)))) {
+                return mapValue;
+            } else {
+                // Add the PLAINTEXT mappings for all controller listener 
names that are not explicitly PLAINTEXT
+                mapValue.putAll(controllerListenerNames().stream()
+                        .filter(listenerName -> 
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+                        .collect(Collectors.toMap(ListenerName::new, ignored 
-> SecurityProtocol.PLAINTEXT)));
+                return mapValue;
+            }
+        } else {
+            return mapValue;
+        }
+    }
+
+    public static Map<String, String> getMap(String propName, String 
propValue) {
+        try {
+            return Csv.parseCsvMap(propValue);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    String.format("Error parsing configuration property '%s': 
%s", propName, e.getMessage()));
+        }
+    }
+
+    private static SecurityProtocol getSecurityProtocol(String protocolName, 
String configName) {
+        try {
+            return SecurityProtocol.forName(protocolName);
+        } catch (IllegalArgumentException e) {
+            throw new ConfigException(
+                    String.format("Invalid security protocol `%s` defined in 
%s", protocolName, configName));
+        }
+    }
+
+    private Map.Entry<ListenerName, SecurityProtocol> 
getInterBrokerListenerNameAndSecurityProtocol() {

Review Comment:
   `interBrokerListenerNameAndSecurityProtocol`



##########
server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+    public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME = 
"VerificationFailureRate";
+    public static final String VERIFICATION_TIME_MS_METRIC_NAME = 
"VerificationTimeMs";
+
+    @FunctionalInterface
+    public interface AppendCallback {
+        void complete(Map<TopicPartition, Errors> partitionErrors);
+    }
+
+    /**
+     * An interface which handles the Partition Response based on the Request 
Version and the exact operation.
+     */
+    @FunctionalInterface
+    public interface TransactionSupportedOperation {
+        boolean supportsEpochBump();
+    }
+
+    /**
+     * This is the default workflow which maps to cases when the Produce 
Request Version or the
+     * Txn_offset_commit request was lower than the first version supporting 
the new Error Class.
+     */
+    public static final TransactionSupportedOperation DEFAULT_ERROR = () -> 
false;
+
+    /**
+     * This maps to the case when the clients are updated to handle the 
TransactionAbortableException.
+     */
+    public static final TransactionSupportedOperation GENERIC_ERROR_SUPPORTED 
= () -> false;
+
+    /**
+     * This allows the partition to be added to the transactions inflight with 
the Produce and TxnOffsetCommit requests.
+     * Plus the behaviors in genericErrorSupported.
+     */
+    public static final TransactionSupportedOperation ADD_PARTITION = () -> 
true;
+
+    public static TransactionSupportedOperation 
produceRequestVersionToTransactionSupportedOperation(short version) {
+        if (version > 11) {
+            return ADD_PARTITION;
+        } else if (version > 10) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    public static TransactionSupportedOperation 
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+        if (version > 4) {
+            return ADD_PARTITION;
+        } else if (version > 3) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    /*
+     * Data structure to hold the transactional data to send to a node. Note 
-- at most one request per transactional ID
+     * will exist at a time in the map. If a given transactional ID exists in 
the map, and a new request with the same ID
+     * comes in, one request will be in the map and one will return to the 
producer with a response depending on the epoch.
+     */
+    public record TransactionDataAndCallbacks(
+            AddPartitionsToTxnTransactionCollection transactionData,
+            Map<String, AppendCallback> callbacks,
+            Map<String, Long> startTimeMs,
+            TransactionSupportedOperation transactionSupportedOperation) { }
+
+    private class AddPartitionsToTxnHandler implements 
RequestCompletionHandler {
+        private final Node node;
+        private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+        public AddPartitionsToTxnHandler(Node node, 
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+            this.node = node;
+            this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            // Note: Synchronization is not needed on inflightNodes since it 
is always accessed from this thread.
+            inflightNodes.remove(node);
+            if (response.authenticationException() != null) {
+                log.error(String.format("AddPartitionsToTxnRequest failed for 
node %s with an authentication exception.", response.destination()),

Review Comment:
   ```
   log.error("AddPartitionsToTxnRequest failed for node {} with an 
authentication exception.", response.destination(), 
response.authenticationException());
   ```



##########
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##########
@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging {
     formatter.addDirectory(metadataDir.getAbsolutePath)
     formatter.setReleaseVersion(metadataVersion)
     formatter.setUnstableFeatureVersionsEnabled(true)
-    formatter.setControllerListenerName(config.controllerListenerNames.head)
+    
formatter.setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow)

Review Comment:
   ```
   formatter.setControllerListenerName(config.controllerListenerNames.get(0))
   ```



##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -91,4 +97,113 @@ public int numRecoveryThreadsPerDataDir() {
     public int backgroundThreads() {
         return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
     }
+
+    public int brokerId() {
+        return getInt(ServerConfigs.BROKER_ID_CONFIG);
+    }
+
+    public int requestTimeoutMs() {
+        return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+    }
+
+    public List<String> controllerListenerNames() {
+        return 
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+    }
+
+    public ListenerName interBrokerListenerName() {
+        return getInterBrokerListenerNameAndSecurityProtocol().getKey();
+    }
+
+    public SecurityProtocol interBrokerSecurityProtocol() {
+        return getInterBrokerListenerNameAndSecurityProtocol().getValue();
+    }
+
+    public Map<ListenerName, SecurityProtocol> 
effectiveListenerSecurityProtocolMap() {
+        Map<ListenerName, SecurityProtocol> mapValue =
+                
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+                        
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+                        .entrySet()
+                        .stream()
+                        .collect(Collectors.toMap(
+                                e -> ListenerName.normalised(e.getKey()),
+                                e -> getSecurityProtocol(
+                                        e.getValue(),
+                                        
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+        if 
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
 {
+            // Using the default configuration since 
listener.security.protocol.map is not explicitly set.
+            // Before adding default PLAINTEXT mappings for controller 
listeners, verify that:
+            // 1. No SSL or SASL protocols are used in controller listeners
+            // 2. No SSL or SASL protocols are used in regular listeners 
(Note: controller listeners
+            //    are not included in 'listeners' config when 
process.roles=broker)
+            if 
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl) 
||
+                    
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+                            .anyMatch(listenerName -> 
isSslOrSasl(parseListenerName(listenerName)))) {
+                return mapValue;
+            } else {
+                // Add the PLAINTEXT mappings for all controller listener 
names that are not explicitly PLAINTEXT
+                mapValue.putAll(controllerListenerNames().stream()
+                        .filter(listenerName -> 
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+                        .collect(Collectors.toMap(ListenerName::new, ignored 
-> SecurityProtocol.PLAINTEXT)));
+                return mapValue;
+            }
+        } else {
+            return mapValue;
+        }
+    }
+
+    public static Map<String, String> getMap(String propName, String 
propValue) {
+        try {
+            return Csv.parseCsvMap(propValue);
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                    String.format("Error parsing configuration property '%s': 
%s", propName, e.getMessage()));
+        }
+    }
+
+    private static SecurityProtocol getSecurityProtocol(String protocolName, 
String configName) {

Review Comment:
   `securityProtocol`



##########
server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+    public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME = 
"VerificationFailureRate";
+    public static final String VERIFICATION_TIME_MS_METRIC_NAME = 
"VerificationTimeMs";
+
+    @FunctionalInterface
+    public interface AppendCallback {
+        void complete(Map<TopicPartition, Errors> partitionErrors);
+    }
+
+    /**
+     * An interface which handles the Partition Response based on the Request 
Version and the exact operation.
+     */
+    @FunctionalInterface
+    public interface TransactionSupportedOperation {
+        boolean supportsEpochBump();
+    }
+
+    /**
+     * This is the default workflow which maps to cases when the Produce 
Request Version or the
+     * Txn_offset_commit request was lower than the first version supporting 
the new Error Class.
+     */
+    public static final TransactionSupportedOperation DEFAULT_ERROR = () -> 
false;
+
+    /**
+     * This maps to the case when the clients are updated to handle the 
TransactionAbortableException.
+     */
+    public static final TransactionSupportedOperation GENERIC_ERROR_SUPPORTED 
= () -> false;
+
+    /**
+     * This allows the partition to be added to the transactions inflight with 
the Produce and TxnOffsetCommit requests.
+     * Plus the behaviors in genericErrorSupported.
+     */
+    public static final TransactionSupportedOperation ADD_PARTITION = () -> 
true;
+
+    public static TransactionSupportedOperation 
produceRequestVersionToTransactionSupportedOperation(short version) {
+        if (version > 11) {
+            return ADD_PARTITION;
+        } else if (version > 10) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    public static TransactionSupportedOperation 
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+        if (version > 4) {
+            return ADD_PARTITION;
+        } else if (version > 3) {
+            return GENERIC_ERROR_SUPPORTED;
+        } else {
+            return DEFAULT_ERROR;
+        }
+    }
+
+    /*
+     * Data structure to hold the transactional data to send to a node. Note 
-- at most one request per transactional ID
+     * will exist at a time in the map. If a given transactional ID exists in 
the map, and a new request with the same ID
+     * comes in, one request will be in the map and one will return to the 
producer with a response depending on the epoch.
+     */
+    public record TransactionDataAndCallbacks(
+            AddPartitionsToTxnTransactionCollection transactionData,
+            Map<String, AppendCallback> callbacks,
+            Map<String, Long> startTimeMs,
+            TransactionSupportedOperation transactionSupportedOperation) { }
+
+    private class AddPartitionsToTxnHandler implements 
RequestCompletionHandler {
+        private final Node node;
+        private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+        public AddPartitionsToTxnHandler(Node node, 
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+            this.node = node;
+            this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+        }
+
+        @Override
+        public void onComplete(ClientResponse response) {
+            // Note: Synchronization is not needed on inflightNodes since it 
is always accessed from this thread.
+            inflightNodes.remove(node);
+            if (response.authenticationException() != null) {
+                log.error(String.format("AddPartitionsToTxnRequest failed for 
node %s with an authentication exception.", response.destination()),
+                        response.authenticationException());
+                
sendCallbacksToAll(Errors.forException(response.authenticationException()).code());
+            } else if (response.versionMismatch() != null) {
+                // We may see unsupported version exception if we try to send 
a verify only request to a broker that can't handle it.
+                // In this case, skip verification.
+                log.warn("AddPartitionsToTxnRequest failed for node {} with 
invalid version exception. " +
+                        "This suggests verification is not supported. 
Continuing handling the produce request.", response.destination());
+                transactionDataAndCallbacks.callbacks().forEach((txnId, 
callback) ->
+                        sendCallback(callback, Map.of(), 
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+            } else if (response.wasDisconnected() || response.wasTimedOut()) {
+                log.warn("AddPartitionsToTxnRequest failed for node {} with a 
network exception.", response.destination());
+                sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code());
+            } else {
+                AddPartitionsToTxnResponseData responseData = 
((AddPartitionsToTxnResponse) response.responseBody()).data();
+                if (responseData.errorCode() != 0) {
+                    log.error("AddPartitionsToTxnRequest for node {} returned 
with error {}.",
+                            response.destination(), 
Errors.forCode(responseData.errorCode()));
+                    // The client should not be exposed to 
CLUSTER_AUTHORIZATION_FAILED so modify the error to signify the verification 
did not complete.
+                    // Return INVALID_TXN_STATE.
+                    short finalError = responseData.errorCode() == 
Errors.CLUSTER_AUTHORIZATION_FAILED.code()
+                            ? Errors.INVALID_TXN_STATE.code() : 
responseData.errorCode();
+                    sendCallbacksToAll(finalError);
+                } else {
+                    for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnResult txnResult : 
responseData.resultsByTransaction()) {
+                        Map<TopicPartition, Errors> unverified = new 
HashMap<>();
+                        for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult : 
txnResult.topicResults()) {
+                            for 
(AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult 
partitionResult : topicResult.resultsByPartition()) {
+                                TopicPartition tp = new 
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+                                if (partitionResult.partitionErrorCode() != 
Errors.NONE.code()) {
+                                    // Producers expect to handle 
INVALID_PRODUCER_EPOCH in this scenario.
+                                    short code;
+                                    if (partitionResult.partitionErrorCode() 
== Errors.PRODUCER_FENCED.code()) {
+                                        code = 
Errors.INVALID_PRODUCER_EPOCH.code();
+                                    } else if 
(partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code()
+                                            && 
transactionDataAndCallbacks.transactionSupportedOperation().equals(DEFAULT_ERROR))
 { // For backward compatibility with clients
+                                        code = Errors.INVALID_TXN_STATE.code();
+                                    } else {
+                                        code = 
partitionResult.partitionErrorCode();
+                                    }
+                                    unverified.put(tp, Errors.forCode(code));
+                                }
+                            }
+                        }
+                        verificationFailureRate.mark(unverified.size());
+                        AppendCallback callback = 
transactionDataAndCallbacks.callbacks().get(txnResult.transactionalId());
+                        sendCallback(callback, unverified, 
transactionDataAndCallbacks.startTimeMs.get(txnResult.transactionalId()));
+                    }
+                }
+            }
+            wakeup();
+        }
+
+        private Map<TopicPartition, Errors> buildErrorMap(String 
transactionalId, short errorCode) {
+            AddPartitionsToTxnTransaction transactionData = 
transactionDataAndCallbacks.transactionData.find(transactionalId);
+            return topicPartitionsToError(transactionData, 
Errors.forCode(errorCode));
+        }
+
+        private void sendCallbacksToAll(short errorCode) {
+            transactionDataAndCallbacks.callbacks.forEach((txnId, cb) ->
+                    sendCallback(cb, buildErrorMap(txnId, errorCode), 
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+        }
+    }
+
+    private final MetadataCache metadataCache;
+    private final Function<String, Integer> partitionFor;
+    private final Time time;
+
+    private final ListenerName interBrokerListenerName;
+    private final Set<Node> inflightNodes = new HashSet<>();
+    private final Map<Node, TransactionDataAndCallbacks> nodesToTransactions = 
new HashMap<>();
+
+    // For compatibility - this metrics group was previously defined within
+    // a Scala class named `kafka.server.AddPartitionsToTxnManager`
+    private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.server", "AddPartitionsToTxnManager");
+    private final Meter verificationFailureRate = 
metricsGroup.newMeter(VERIFICATION_FAILURE_RATE_METRIC_NAME, "failures", 
TimeUnit.SECONDS);
+    private final Histogram verificationTimeMs = 
metricsGroup.newHistogram(VERIFICATION_TIME_MS_METRIC_NAME);
+
+    public AddPartitionsToTxnManager(
+            AbstractKafkaConfig config,
+            NetworkClient client,
+            MetadataCache metadataCache,
+            Function<String, Integer> partitionFor,
+            Time time) {
+        super("AddPartitionsToTxnSenderThread-" + config.brokerId(), client, 
config.requestTimeoutMs(), time);
+        this.interBrokerListenerName = config.interBrokerListenerName();
+        this.metadataCache = metadataCache;
+        this.partitionFor = partitionFor;
+        this.time = time;
+    }
+
+    public void addOrVerifyTransaction(
+            String transactionalId,
+            long producerId,
+            short producerEpoch,
+            Collection<TopicPartition> topicPartitions,
+            AppendCallback callback,
+            TransactionSupportedOperation transactionSupportedOperation) {
+        Optional<Node> coordinator = 
getTransactionCoordinator(partitionFor.apply(transactionalId));
+        if (coordinator.isEmpty()) {
+            callback.complete(topicPartitions.stream().collect(
+                    Collectors.toMap(Function.identity(), tp -> 
Errors.COORDINATOR_NOT_AVAILABLE)));
+        } else {
+            AddPartitionsToTxnTopicCollection topicCollection = new 
AddPartitionsToTxnTopicCollection();
+            
topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
 tps) -> {
+                topicCollection.add(new AddPartitionsToTxnTopic()
+                        .setName(topic)
+                        
.setPartitions(tps.stream().map(TopicPartition::partition).collect(Collectors.toList())));
+            });
+
+            AddPartitionsToTxnTransaction transactionData = new 
AddPartitionsToTxnTransaction()
+                    .setTransactionalId(transactionalId)
+                    .setProducerId(producerId)
+                    .setProducerEpoch(producerEpoch)
+                    
.setVerifyOnly(!transactionSupportedOperation.supportsEpochBump())
+                    .setTopics(topicCollection);
+
+            addTxnData(coordinator.get(), transactionData, callback, 
transactionSupportedOperation);
+        }
+    }
+
+    private void addTxnData(
+            Node node,
+            AddPartitionsToTxnTransaction transactionData,
+            AppendCallback callback,
+            TransactionSupportedOperation transactionSupportedOperation) {
+        synchronized (nodesToTransactions) {
+            long curTime = time.milliseconds();
+            // Check if we have already had either node or individual 
transaction. Add the Node if it isn't there.
+            TransactionDataAndCallbacks existingNodeAndTransactionData = 
nodesToTransactions.computeIfAbsent(node,
+                    ignored -> new TransactionDataAndCallbacks(
+                            new AddPartitionsToTxnTransactionCollection(1),
+                            new HashMap<>(),
+                            new HashMap<>(),
+                            transactionSupportedOperation));
+
+            AddPartitionsToTxnTransaction existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId());
+
+            // There are 3 cases if we already have existing data
+            // 1. Incoming data has a higher epoch -- return 
INVALID_PRODUCER_EPOCH for existing data since it is fenced
+            // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION 
for existing data, since the client is likely retrying and we want another 
retriable exception
+            // 3. Incoming data has a lower epoch -- return 
INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add 
incoming data to verify
+            if (existingTransactionData != null) {
+                if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+                    Errors error = (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+                            ? Errors.INVALID_PRODUCER_EPOCH : 
Errors.NETWORK_EXCEPTION;
+                    AppendCallback oldCallback = 
existingNodeAndTransactionData.callbacks.get(transactionData.transactionalId());
+                    
existingNodeAndTransactionData.transactionData.remove(transactionData);
+                    sendCallback(oldCallback, 
topicPartitionsToError(existingTransactionData, error), 
existingNodeAndTransactionData.startTimeMs.get(transactionData.transactionalId()));
+                } else {
+                    // If the incoming transactionData's epoch is lower, we 
can return with INVALID_PRODUCER_EPOCH immediately.
+                    sendCallback(callback, 
topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH), 
curTime);
+                    return;
+                }
+            }
+
+            
existingNodeAndTransactionData.transactionData.add(transactionData);
+            
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback);
+            
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId(),
 curTime);
+            wakeup();
+        }
+    }
+
+    private Optional<Node> getTransactionCoordinator(int partition) {
+        return 
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+                .filter(leaderAndIsr -> leaderAndIsr.leader() != 
MetadataResponse.NO_LEADER_ID)
+                .flatMap(metadata -> 
metadataCache.getAliveBrokerNode(metadata.leader(), interBrokerListenerName));
+    }
+
+    private Map<TopicPartition, Errors> 
topicPartitionsToError(AddPartitionsToTxnTransaction txnData, Errors error) {
+        Map<TopicPartition, Errors> topicPartitionsToError = new HashMap<>();
+        txnData.topics().forEach(topic ->
+            topic.partitions().forEach(partition ->
+                topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)));
+        verificationFailureRate.mark(topicPartitionsToError.size());
+        return topicPartitionsToError;
+    }
+
+    private void sendCallback(AppendCallback callback, Map<TopicPartition, 
Errors> errors, long startTimeMs) {
+        verificationTimeMs.update(time.milliseconds() - startTimeMs);
+        callback.complete(errors);
+    }
+
+    @Override
+    public Collection<RequestAndCompletionHandler> generateRequests() {
+        // build and add requests to the queue
+        List<RequestAndCompletionHandler> list = new ArrayList<>();

Review Comment:
   Please consider using `Iterator.remove` to optimize it.
   ```java
       public Collection<RequestAndCompletionHandler> generateRequests() {
           // build and add requests to the queue
           List<RequestAndCompletionHandler> list = new ArrayList<>();
           long currentTimeMs = time.milliseconds();
           synchronized (nodesToTransactions) {
               var iter = nodesToTransactions.entrySet().iterator();
               while (iter.hasNext()) {
                   var entry = iter.next();
                   var node = entry.getKey(); // 第一次 next()
                   var transactionDataAndCallbacks = entry.getValue(); // 第二次 
next(),這裡有問題
                   if (!inflightNodes.contains(node)) {
                       list.add(new RequestAndCompletionHandler(
                               currentTimeMs,
                               node,
                               
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
                               new AddPartitionsToTxnHandler(node, 
transactionDataAndCallbacks)
                       ));
                       inflightNodes.add(node);
                       iter.remove();
                   }
               }
           }
           return list;
       }
   ```



##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java:
##########
@@ -137,7 +137,9 @@ public ListenerName controllerListenerName() {
                     .next()
                     .config()
                     .controllerListenerNames()
-                    .head()
+                    .stream()

Review Comment:
   ```
               return new ListenerName(
                   Objects.requireNonNull(controllers()
                       .values()
                       .iterator()
                       .next()
                       .config()
                       .controllerListenerNames()
                       .get(0))
               );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to