chia7712 commented on code in PR #19879:
URL: https://github.com/apache/kafka/pull/19879#discussion_r2121989155
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -91,4 +97,113 @@ public int numRecoveryThreadsPerDataDir() {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
+
+ public int brokerId() {
+ return getInt(ServerConfigs.BROKER_ID_CONFIG);
+ }
+
+ public int requestTimeoutMs() {
+ return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+ }
+
+ public List<String> controllerListenerNames() {
+ return
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+ }
+
+ public ListenerName interBrokerListenerName() {
+ return getInterBrokerListenerNameAndSecurityProtocol().getKey();
+ }
+
+ public SecurityProtocol interBrokerSecurityProtocol() {
+ return getInterBrokerListenerNameAndSecurityProtocol().getValue();
+ }
+
+ public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap() {
+ Map<ListenerName, SecurityProtocol> mapValue =
+
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ e -> ListenerName.normalised(e.getKey()),
+ e -> getSecurityProtocol(
+ e.getValue(),
+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+ if
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
{
+ // Using the default configuration since
listener.security.protocol.map is not explicitly set.
+ // Before adding default PLAINTEXT mappings for controller
listeners, verify that:
+ // 1. No SSL or SASL protocols are used in controller listeners
+ // 2. No SSL or SASL protocols are used in regular listeners
(Note: controller listeners
+ // are not included in 'listeners' config when
process.roles=broker)
+ if
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl)
||
+
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+ .anyMatch(listenerName ->
isSslOrSasl(parseListenerName(listenerName)))) {
+ return mapValue;
+ } else {
+ // Add the PLAINTEXT mappings for all controller listener
names that are not explicitly PLAINTEXT
+ mapValue.putAll(controllerListenerNames().stream()
+ .filter(listenerName ->
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+ .collect(Collectors.toMap(ListenerName::new, ignored
-> SecurityProtocol.PLAINTEXT)));
+ return mapValue;
+ }
+ } else {
+ return mapValue;
+ }
+ }
+
+ public static Map<String, String> getMap(String propName, String
propValue) {
+ try {
+ return Csv.parseCsvMap(propValue);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Error parsing configuration property '%s':
%s", propName, e.getMessage()));
+ }
+ }
+
+ private static SecurityProtocol getSecurityProtocol(String protocolName,
String configName) {
+ try {
+ return SecurityProtocol.forName(protocolName);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException(
+ String.format("Invalid security protocol `%s` defined in
%s", protocolName, configName));
+ }
+ }
+
+ private Map.Entry<ListenerName, SecurityProtocol>
getInterBrokerListenerNameAndSecurityProtocol() {
Review Comment:
`interBrokerListenerNameAndSecurityProtocol`
##########
server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+ public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME =
"VerificationFailureRate";
+ public static final String VERIFICATION_TIME_MS_METRIC_NAME =
"VerificationTimeMs";
+
+ @FunctionalInterface
+ public interface AppendCallback {
+ void complete(Map<TopicPartition, Errors> partitionErrors);
+ }
+
+ /**
+ * An interface which handles the Partition Response based on the Request
Version and the exact operation.
+ */
+ @FunctionalInterface
+ public interface TransactionSupportedOperation {
+ boolean supportsEpochBump();
+ }
+
+ /**
+ * This is the default workflow which maps to cases when the Produce
Request Version or the
+ * Txn_offset_commit request was lower than the first version supporting
the new Error Class.
+ */
+ public static final TransactionSupportedOperation DEFAULT_ERROR = () ->
false;
+
+ /**
+ * This maps to the case when the clients are updated to handle the
TransactionAbortableException.
+ */
+ public static final TransactionSupportedOperation GENERIC_ERROR_SUPPORTED
= () -> false;
+
+ /**
+ * This allows the partition to be added to the transactions inflight with
the Produce and TxnOffsetCommit requests.
+ * Plus the behaviors in genericErrorSupported.
+ */
+ public static final TransactionSupportedOperation ADD_PARTITION = () ->
true;
+
+ public static TransactionSupportedOperation
produceRequestVersionToTransactionSupportedOperation(short version) {
+ if (version > 11) {
+ return ADD_PARTITION;
+ } else if (version > 10) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ public static TransactionSupportedOperation
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+ if (version > 4) {
+ return ADD_PARTITION;
+ } else if (version > 3) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ /*
+ * Data structure to hold the transactional data to send to a node. Note
-- at most one request per transactional ID
+ * will exist at a time in the map. If a given transactional ID exists in
the map, and a new request with the same ID
+ * comes in, one request will be in the map and one will return to the
producer with a response depending on the epoch.
+ */
+ public record TransactionDataAndCallbacks(
+ AddPartitionsToTxnTransactionCollection transactionData,
+ Map<String, AppendCallback> callbacks,
+ Map<String, Long> startTimeMs,
+ TransactionSupportedOperation transactionSupportedOperation) { }
+
+ private class AddPartitionsToTxnHandler implements
RequestCompletionHandler {
+ private final Node node;
+ private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+ public AddPartitionsToTxnHandler(Node node,
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+ this.node = node;
+ this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ // Note: Synchronization is not needed on inflightNodes since it
is always accessed from this thread.
+ inflightNodes.remove(node);
+ if (response.authenticationException() != null) {
+ log.error(String.format("AddPartitionsToTxnRequest failed for
node %s with an authentication exception.", response.destination()),
Review Comment:
```
log.error("AddPartitionsToTxnRequest failed for node {} with an
authentication exception.", response.destination(),
response.authenticationException());
```
##########
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##########
@@ -275,7 +275,7 @@ abstract class QuorumTestHarness extends Logging {
formatter.addDirectory(metadataDir.getAbsolutePath)
formatter.setReleaseVersion(metadataVersion)
formatter.setUnstableFeatureVersionsEnabled(true)
- formatter.setControllerListenerName(config.controllerListenerNames.head)
+
formatter.setControllerListenerName(config.controllerListenerNames.stream.findFirst.orElseThrow)
Review Comment:
```
formatter.setControllerListenerName(config.controllerListenerNames.get(0))
```
##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -91,4 +97,113 @@ public int numRecoveryThreadsPerDataDir() {
public int backgroundThreads() {
return getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG);
}
+
+ public int brokerId() {
+ return getInt(ServerConfigs.BROKER_ID_CONFIG);
+ }
+
+ public int requestTimeoutMs() {
+ return getInt(ServerConfigs.REQUEST_TIMEOUT_MS_CONFIG);
+ }
+
+ public List<String> controllerListenerNames() {
+ return
Csv.parseCsvList(getString(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG));
+ }
+
+ public ListenerName interBrokerListenerName() {
+ return getInterBrokerListenerNameAndSecurityProtocol().getKey();
+ }
+
+ public SecurityProtocol interBrokerSecurityProtocol() {
+ return getInterBrokerListenerNameAndSecurityProtocol().getValue();
+ }
+
+ public Map<ListenerName, SecurityProtocol>
effectiveListenerSecurityProtocolMap() {
+ Map<ListenerName, SecurityProtocol> mapValue =
+
getMap(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
+
getString(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ e -> ListenerName.normalised(e.getKey()),
+ e -> getSecurityProtocol(
+ e.getValue(),
+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)));
+
+ if
(!originals().containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG))
{
+ // Using the default configuration since
listener.security.protocol.map is not explicitly set.
+ // Before adding default PLAINTEXT mappings for controller
listeners, verify that:
+ // 1. No SSL or SASL protocols are used in controller listeners
+ // 2. No SSL or SASL protocols are used in regular listeners
(Note: controller listeners
+ // are not included in 'listeners' config when
process.roles=broker)
+ if
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl)
||
+
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+ .anyMatch(listenerName ->
isSslOrSasl(parseListenerName(listenerName)))) {
+ return mapValue;
+ } else {
+ // Add the PLAINTEXT mappings for all controller listener
names that are not explicitly PLAINTEXT
+ mapValue.putAll(controllerListenerNames().stream()
+ .filter(listenerName ->
!SecurityProtocol.PLAINTEXT.name.equals(listenerName))
+ .collect(Collectors.toMap(ListenerName::new, ignored
-> SecurityProtocol.PLAINTEXT)));
+ return mapValue;
+ }
+ } else {
+ return mapValue;
+ }
+ }
+
+ public static Map<String, String> getMap(String propName, String
propValue) {
+ try {
+ return Csv.parseCsvMap(propValue);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format("Error parsing configuration property '%s':
%s", propName, e.getMessage()));
+ }
+ }
+
+ private static SecurityProtocol getSecurityProtocol(String protocolName,
String configName) {
Review Comment:
`securityProtocol`
##########
server/src/main/java/org/apache/kafka/server/transaction/AddPartitionsToTxnManager.java:
##########
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.transaction;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransaction;
+import
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTransactionCollection;
+import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.config.AbstractKafkaConfig;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.util.InterBrokerSendThread;
+import org.apache.kafka.server.util.RequestAndCompletionHandler;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Meter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class AddPartitionsToTxnManager extends InterBrokerSendThread {
+
+ public static final String VERIFICATION_FAILURE_RATE_METRIC_NAME =
"VerificationFailureRate";
+ public static final String VERIFICATION_TIME_MS_METRIC_NAME =
"VerificationTimeMs";
+
+ @FunctionalInterface
+ public interface AppendCallback {
+ void complete(Map<TopicPartition, Errors> partitionErrors);
+ }
+
+ /**
+ * An interface which handles the Partition Response based on the Request
Version and the exact operation.
+ */
+ @FunctionalInterface
+ public interface TransactionSupportedOperation {
+ boolean supportsEpochBump();
+ }
+
+ /**
+ * This is the default workflow which maps to cases when the Produce
Request Version or the
+ * Txn_offset_commit request was lower than the first version supporting
the new Error Class.
+ */
+ public static final TransactionSupportedOperation DEFAULT_ERROR = () ->
false;
+
+ /**
+ * This maps to the case when the clients are updated to handle the
TransactionAbortableException.
+ */
+ public static final TransactionSupportedOperation GENERIC_ERROR_SUPPORTED
= () -> false;
+
+ /**
+ * This allows the partition to be added to the transactions inflight with
the Produce and TxnOffsetCommit requests.
+ * Plus the behaviors in genericErrorSupported.
+ */
+ public static final TransactionSupportedOperation ADD_PARTITION = () ->
true;
+
+ public static TransactionSupportedOperation
produceRequestVersionToTransactionSupportedOperation(short version) {
+ if (version > 11) {
+ return ADD_PARTITION;
+ } else if (version > 10) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ public static TransactionSupportedOperation
txnOffsetCommitRequestVersionToTransactionSupportedOperation(int version) {
+ if (version > 4) {
+ return ADD_PARTITION;
+ } else if (version > 3) {
+ return GENERIC_ERROR_SUPPORTED;
+ } else {
+ return DEFAULT_ERROR;
+ }
+ }
+
+ /*
+ * Data structure to hold the transactional data to send to a node. Note
-- at most one request per transactional ID
+ * will exist at a time in the map. If a given transactional ID exists in
the map, and a new request with the same ID
+ * comes in, one request will be in the map and one will return to the
producer with a response depending on the epoch.
+ */
+ public record TransactionDataAndCallbacks(
+ AddPartitionsToTxnTransactionCollection transactionData,
+ Map<String, AppendCallback> callbacks,
+ Map<String, Long> startTimeMs,
+ TransactionSupportedOperation transactionSupportedOperation) { }
+
+ private class AddPartitionsToTxnHandler implements
RequestCompletionHandler {
+ private final Node node;
+ private final TransactionDataAndCallbacks transactionDataAndCallbacks;
+
+ public AddPartitionsToTxnHandler(Node node,
TransactionDataAndCallbacks transactionDataAndCallbacks) {
+ this.node = node;
+ this.transactionDataAndCallbacks = transactionDataAndCallbacks;
+ }
+
+ @Override
+ public void onComplete(ClientResponse response) {
+ // Note: Synchronization is not needed on inflightNodes since it
is always accessed from this thread.
+ inflightNodes.remove(node);
+ if (response.authenticationException() != null) {
+ log.error(String.format("AddPartitionsToTxnRequest failed for
node %s with an authentication exception.", response.destination()),
+ response.authenticationException());
+
sendCallbacksToAll(Errors.forException(response.authenticationException()).code());
+ } else if (response.versionMismatch() != null) {
+ // We may see unsupported version exception if we try to send
a verify only request to a broker that can't handle it.
+ // In this case, skip verification.
+ log.warn("AddPartitionsToTxnRequest failed for node {} with
invalid version exception. " +
+ "This suggests verification is not supported.
Continuing handling the produce request.", response.destination());
+ transactionDataAndCallbacks.callbacks().forEach((txnId,
callback) ->
+ sendCallback(callback, Map.of(),
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+ } else if (response.wasDisconnected() || response.wasTimedOut()) {
+ log.warn("AddPartitionsToTxnRequest failed for node {} with a
network exception.", response.destination());
+ sendCallbacksToAll(Errors.NETWORK_EXCEPTION.code());
+ } else {
+ AddPartitionsToTxnResponseData responseData =
((AddPartitionsToTxnResponse) response.responseBody()).data();
+ if (responseData.errorCode() != 0) {
+ log.error("AddPartitionsToTxnRequest for node {} returned
with error {}.",
+ response.destination(),
Errors.forCode(responseData.errorCode()));
+ // The client should not be exposed to
CLUSTER_AUTHORIZATION_FAILED so modify the error to signify the verification
did not complete.
+ // Return INVALID_TXN_STATE.
+ short finalError = responseData.errorCode() ==
Errors.CLUSTER_AUTHORIZATION_FAILED.code()
+ ? Errors.INVALID_TXN_STATE.code() :
responseData.errorCode();
+ sendCallbacksToAll(finalError);
+ } else {
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnResult txnResult :
responseData.resultsByTransaction()) {
+ Map<TopicPartition, Errors> unverified = new
HashMap<>();
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnTopicResult topicResult :
txnResult.topicResults()) {
+ for
(AddPartitionsToTxnResponseData.AddPartitionsToTxnPartitionResult
partitionResult : topicResult.resultsByPartition()) {
+ TopicPartition tp = new
TopicPartition(topicResult.name(), partitionResult.partitionIndex());
+ if (partitionResult.partitionErrorCode() !=
Errors.NONE.code()) {
+ // Producers expect to handle
INVALID_PRODUCER_EPOCH in this scenario.
+ short code;
+ if (partitionResult.partitionErrorCode()
== Errors.PRODUCER_FENCED.code()) {
+ code =
Errors.INVALID_PRODUCER_EPOCH.code();
+ } else if
(partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code()
+ &&
transactionDataAndCallbacks.transactionSupportedOperation().equals(DEFAULT_ERROR))
{ // For backward compatibility with clients
+ code = Errors.INVALID_TXN_STATE.code();
+ } else {
+ code =
partitionResult.partitionErrorCode();
+ }
+ unverified.put(tp, Errors.forCode(code));
+ }
+ }
+ }
+ verificationFailureRate.mark(unverified.size());
+ AppendCallback callback =
transactionDataAndCallbacks.callbacks().get(txnResult.transactionalId());
+ sendCallback(callback, unverified,
transactionDataAndCallbacks.startTimeMs.get(txnResult.transactionalId()));
+ }
+ }
+ }
+ wakeup();
+ }
+
+ private Map<TopicPartition, Errors> buildErrorMap(String
transactionalId, short errorCode) {
+ AddPartitionsToTxnTransaction transactionData =
transactionDataAndCallbacks.transactionData.find(transactionalId);
+ return topicPartitionsToError(transactionData,
Errors.forCode(errorCode));
+ }
+
+ private void sendCallbacksToAll(short errorCode) {
+ transactionDataAndCallbacks.callbacks.forEach((txnId, cb) ->
+ sendCallback(cb, buildErrorMap(txnId, errorCode),
transactionDataAndCallbacks.startTimeMs.get(txnId)));
+ }
+ }
+
+ private final MetadataCache metadataCache;
+ private final Function<String, Integer> partitionFor;
+ private final Time time;
+
+ private final ListenerName interBrokerListenerName;
+ private final Set<Node> inflightNodes = new HashSet<>();
+ private final Map<Node, TransactionDataAndCallbacks> nodesToTransactions =
new HashMap<>();
+
+ // For compatibility - this metrics group was previously defined within
+ // a Scala class named `kafka.server.AddPartitionsToTxnManager`
+ private final KafkaMetricsGroup metricsGroup = new
KafkaMetricsGroup("kafka.server", "AddPartitionsToTxnManager");
+ private final Meter verificationFailureRate =
metricsGroup.newMeter(VERIFICATION_FAILURE_RATE_METRIC_NAME, "failures",
TimeUnit.SECONDS);
+ private final Histogram verificationTimeMs =
metricsGroup.newHistogram(VERIFICATION_TIME_MS_METRIC_NAME);
+
+ public AddPartitionsToTxnManager(
+ AbstractKafkaConfig config,
+ NetworkClient client,
+ MetadataCache metadataCache,
+ Function<String, Integer> partitionFor,
+ Time time) {
+ super("AddPartitionsToTxnSenderThread-" + config.brokerId(), client,
config.requestTimeoutMs(), time);
+ this.interBrokerListenerName = config.interBrokerListenerName();
+ this.metadataCache = metadataCache;
+ this.partitionFor = partitionFor;
+ this.time = time;
+ }
+
+ public void addOrVerifyTransaction(
+ String transactionalId,
+ long producerId,
+ short producerEpoch,
+ Collection<TopicPartition> topicPartitions,
+ AppendCallback callback,
+ TransactionSupportedOperation transactionSupportedOperation) {
+ Optional<Node> coordinator =
getTransactionCoordinator(partitionFor.apply(transactionalId));
+ if (coordinator.isEmpty()) {
+ callback.complete(topicPartitions.stream().collect(
+ Collectors.toMap(Function.identity(), tp ->
Errors.COORDINATOR_NOT_AVAILABLE)));
+ } else {
+ AddPartitionsToTxnTopicCollection topicCollection = new
AddPartitionsToTxnTopicCollection();
+
topicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic,
tps) -> {
+ topicCollection.add(new AddPartitionsToTxnTopic()
+ .setName(topic)
+
.setPartitions(tps.stream().map(TopicPartition::partition).collect(Collectors.toList())));
+ });
+
+ AddPartitionsToTxnTransaction transactionData = new
AddPartitionsToTxnTransaction()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerId)
+ .setProducerEpoch(producerEpoch)
+
.setVerifyOnly(!transactionSupportedOperation.supportsEpochBump())
+ .setTopics(topicCollection);
+
+ addTxnData(coordinator.get(), transactionData, callback,
transactionSupportedOperation);
+ }
+ }
+
+ private void addTxnData(
+ Node node,
+ AddPartitionsToTxnTransaction transactionData,
+ AppendCallback callback,
+ TransactionSupportedOperation transactionSupportedOperation) {
+ synchronized (nodesToTransactions) {
+ long curTime = time.milliseconds();
+ // Check if we have already had either node or individual
transaction. Add the Node if it isn't there.
+ TransactionDataAndCallbacks existingNodeAndTransactionData =
nodesToTransactions.computeIfAbsent(node,
+ ignored -> new TransactionDataAndCallbacks(
+ new AddPartitionsToTxnTransactionCollection(1),
+ new HashMap<>(),
+ new HashMap<>(),
+ transactionSupportedOperation));
+
+ AddPartitionsToTxnTransaction existingTransactionData =
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId());
+
+ // There are 3 cases if we already have existing data
+ // 1. Incoming data has a higher epoch -- return
INVALID_PRODUCER_EPOCH for existing data since it is fenced
+ // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION
for existing data, since the client is likely retrying and we want another
retriable exception
+ // 3. Incoming data has a lower epoch -- return
INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add
incoming data to verify
+ if (existingTransactionData != null) {
+ if (existingTransactionData.producerEpoch() <=
transactionData.producerEpoch()) {
+ Errors error = (existingTransactionData.producerEpoch() <
transactionData.producerEpoch())
+ ? Errors.INVALID_PRODUCER_EPOCH :
Errors.NETWORK_EXCEPTION;
+ AppendCallback oldCallback =
existingNodeAndTransactionData.callbacks.get(transactionData.transactionalId());
+
existingNodeAndTransactionData.transactionData.remove(transactionData);
+ sendCallback(oldCallback,
topicPartitionsToError(existingTransactionData, error),
existingNodeAndTransactionData.startTimeMs.get(transactionData.transactionalId()));
+ } else {
+ // If the incoming transactionData's epoch is lower, we
can return with INVALID_PRODUCER_EPOCH immediately.
+ sendCallback(callback,
topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH),
curTime);
+ return;
+ }
+ }
+
+
existingNodeAndTransactionData.transactionData.add(transactionData);
+
existingNodeAndTransactionData.callbacks.put(transactionData.transactionalId(),
callback);
+
existingNodeAndTransactionData.startTimeMs.put(transactionData.transactionalId(),
curTime);
+ wakeup();
+ }
+ }
+
+ private Optional<Node> getTransactionCoordinator(int partition) {
+ return
metadataCache.getLeaderAndIsr(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
+ .filter(leaderAndIsr -> leaderAndIsr.leader() !=
MetadataResponse.NO_LEADER_ID)
+ .flatMap(metadata ->
metadataCache.getAliveBrokerNode(metadata.leader(), interBrokerListenerName));
+ }
+
+ private Map<TopicPartition, Errors>
topicPartitionsToError(AddPartitionsToTxnTransaction txnData, Errors error) {
+ Map<TopicPartition, Errors> topicPartitionsToError = new HashMap<>();
+ txnData.topics().forEach(topic ->
+ topic.partitions().forEach(partition ->
+ topicPartitionsToError.put(new TopicPartition(topic.name(),
partition), error)));
+ verificationFailureRate.mark(topicPartitionsToError.size());
+ return topicPartitionsToError;
+ }
+
+ private void sendCallback(AppendCallback callback, Map<TopicPartition,
Errors> errors, long startTimeMs) {
+ verificationTimeMs.update(time.milliseconds() - startTimeMs);
+ callback.complete(errors);
+ }
+
+ @Override
+ public Collection<RequestAndCompletionHandler> generateRequests() {
+ // build and add requests to the queue
+ List<RequestAndCompletionHandler> list = new ArrayList<>();
Review Comment:
Please consider using `Iterator.remove` to optimize it.
```java
public Collection<RequestAndCompletionHandler> generateRequests() {
// build and add requests to the queue
List<RequestAndCompletionHandler> list = new ArrayList<>();
long currentTimeMs = time.milliseconds();
synchronized (nodesToTransactions) {
var iter = nodesToTransactions.entrySet().iterator();
while (iter.hasNext()) {
var entry = iter.next();
var node = entry.getKey(); // 第一次 next()
var transactionDataAndCallbacks = entry.getValue(); // 第二次
next(),這裡有問題
if (!inflightNodes.contains(node)) {
list.add(new RequestAndCompletionHandler(
currentTimeMs,
node,
AddPartitionsToTxnRequest.Builder.forBroker(transactionDataAndCallbacks.transactionData()),
new AddPartitionsToTxnHandler(node,
transactionDataAndCallbacks)
));
inflightNodes.add(node);
iter.remove();
}
}
}
return list;
}
```
##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java:
##########
@@ -137,7 +137,9 @@ public ListenerName controllerListenerName() {
.next()
.config()
.controllerListenerNames()
- .head()
+ .stream()
Review Comment:
```
return new ListenerName(
Objects.requireNonNull(controllers()
.values()
.iterator()
.next()
.config()
.controllerListenerNames()
.get(0))
);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]