chia7712 commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r612182543



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,

Review comment:
       why this type is `Integer` rather than `int`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());

Review comment:
       Should it verify the `type`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategyTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CoordinatorStrategyTest {
+
+    @Test
+    public void testBuildLookupRequest() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new 
LogContext());
+        FindCoordinatorRequest.Builder request = 
strategy.buildRequest(singleton(
+            CoordinatorKey.byGroupId("foo")));
+        assertEquals("foo", request.data().key());
+        assertEquals(CoordinatorType.GROUP, 
CoordinatorType.forId(request.data().keyType()));
+    }
+
+    @Test
+    public void testBuildLookupRequestRequiresOneKey() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new 
LogContext());
+        assertThrows(IllegalArgumentException.class, () -> 
strategy.buildRequest(Collections.emptySet()));
+
+        CoordinatorKey group1 = CoordinatorKey.byGroupId("foo");
+        CoordinatorKey group2 = CoordinatorKey.byGroupId("bar");
+        assertThrows(IllegalArgumentException.class, () -> 
strategy.buildRequest(mkSet(group1, group2)));
+    }
+
+    @Test
+    public void testSuccessfulCoordinatorLookup() {
+        CoordinatorKey group = CoordinatorKey.byGroupId("foo");
+
+        FindCoordinatorResponseData responseData = new 
FindCoordinatorResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setHost("localhost")
+            .setPort(9092)
+            .setNodeId(1);
+
+        AdminApiLookupStrategy.LookupResult<CoordinatorKey> result = 
runLookup(group, responseData);
+        assertEquals(singletonMap(group, 1), result.mappedKeys);
+        assertEquals(emptyMap(), result.failedKeys);
+    }
+
+    @Test
+    public void testRetriableCoordinatorLookup() {
+        testRetriableCoordinatorLookup(Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        testRetriableCoordinatorLookup(Errors.COORDINATOR_NOT_AVAILABLE);
+    }
+
+    public void testRetriableCoordinatorLookup(Errors error) {

Review comment:
       Could you change modifier from public to private? Otherwise, it looks 
like a bug that we forget to add test annotation.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {

Review comment:
       Should it need `@InterfaceStability.Evolving`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,

Review comment:
       ditto

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {
+    ONGOING("Ongoing"),
+    PREPARE_ABORT("PrepareAbort"),
+    PREPARE_COMMIT("PrepareCommit"),
+    COMPLETE_ABORT("CompleteAbort"),
+    COMPLETE_COMMIT("CompleteCommit"),
+    EMPTY("Empty"),
+    PREPARE_EPOCH_FENCE("PrepareEpochFence"),
+    UNKNOWN("Unknown");
+
+    private final static HashMap<String, TransactionState> NAME_TO_ENUM;
+
+    static {
+        NAME_TO_ENUM = new HashMap<>();
+        for (TransactionState state : TransactionState.values()) {
+            NAME_TO_ENUM.put(state.name, state);
+        }
+    }
+
+    private final String name;
+
+    TransactionState(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    public static TransactionState parse(String name) {

Review comment:
       The name is a part of serialization data now. Should we unify 
`TransactionState` for both scala and java? 

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategyTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CoordinatorStrategyTest {
+
+    @Test
+    public void testBuildLookupRequest() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new 
LogContext());
+        FindCoordinatorRequest.Builder request = 
strategy.buildRequest(singleton(
+            CoordinatorKey.byGroupId("foo")));
+        assertEquals("foo", request.data().key());
+        assertEquals(CoordinatorType.GROUP, 
CoordinatorType.forId(request.data().keyType()));
+    }
+
+    @Test
+    public void testBuildLookupRequestRequiresOneKey() {

Review comment:
       `handleResponse` needs similar UT (XXXRequiresOneKey)

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) 
abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState 
transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = 
CoordinatorKey.byTransactionalId(
+                transactionState.transactionalId());
+            if (!keys.contains(transactionalIdKey)) {
+                log.warn("Response included transactionalId `{}`, which was 
not requested",
+                    transactionState.transactionalId());
+                continue;
+            }
+
+            Errors error = Errors.forCode(transactionState.errorCode());
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = 
transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : 
transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new 
TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + 
transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new 
TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + 
transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("DescribeTransactions request for transactionalId 
`{}` failed because the " +
+                        "coordinator is still in the process of loading state. 
Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId 
`{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, 
error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` 
failed due to unexpected error "));

Review comment:
       redundant space: "to unexpected error"

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {
+    ONGOING("Ongoing"),
+    PREPARE_ABORT("PrepareAbort"),
+    PREPARE_COMMIT("PrepareCommit"),
+    COMPLETE_ABORT("CompleteAbort"),
+    COMPLETE_COMMIT("CompleteCommit"),
+    EMPTY("Empty"),
+    PREPARE_EPOCH_FENCE("PrepareEpochFence"),
+    UNKNOWN("Unknown");
+
+    private final static HashMap<String, TransactionState> NAME_TO_ENUM;

Review comment:
       How about using java stream?
   ```java
   Arrays.stream(TransactionState.values())
                   .collect(Collectors.toMap(TransactionState::name, 
Function.identity()))
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements 
AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> 
transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new 
DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> 
key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) 
abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new 
HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState 
transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = 
CoordinatorKey.byTransactionalId(
+                transactionState.transactionalId());
+            if (!keys.contains(transactionalIdKey)) {
+                log.warn("Response included transactionalId `{}`, which was 
not requested",
+                    transactionState.transactionalId());
+                continue;
+            }
+
+            Errors error = Errors.forCode(transactionState.errorCode());
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = 
transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : 
transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new 
TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + 
transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new 
TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + 
transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("DescribeTransactions request for transactionalId 
`{}` failed because the " +
+                        "coordinator is still in the process of loading state. 
Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId 
`{}` returned error {}. Will retry",

Review comment:
       This error message seems to be inconsistent with comment "retry the 
`FindCoordinator` request"

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/TransactionDescription.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.Set;
+
+public class TransactionDescription {

Review comment:
       Should it need `@InterfaceStability.Evolving`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to