This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1db89d7643d [improve][transactions] Add command to list transaction
coordinators (#17522)
1db89d7643d is described below
commit 1db89d7643d1df39ca77c0c33828b9025a015ee5
Author: Nicolò Boschi <[email protected]>
AuthorDate: Fri Nov 11 13:55:03 2022 +0100
[improve][transactions] Add command to list transaction coordinators
(#17522)
* [improve][transactions] Add command to list transaction coordinators url
* Address pr's comment and rebase
* checkstyle
* remove debug
* Remove useless call and fix style
* style
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 29 +++++++++++++++++
.../pulsar/broker/admin/v3/Transactions.java | 11 +++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 13 ++++++++
.../apache/pulsar/client/admin/Transactions.java | 17 ++++++++++
.../policies/data/TransactionCoordinatorInfo.java | 36 ++++++++++++++++++++++
.../client/admin/internal/TransactionsImpl.java | 13 ++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +++
.../apache/pulsar/admin/cli/CmdTransactions.java | 19 ++++++++++++
8 files changed, 142 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 4e96cc2fcda..f537f0ecdb9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -47,6 +48,7 @@ import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -68,6 +70,33 @@ import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore
@Slf4j
public abstract class TransactionsBase extends AdminResource {
+ protected void internalListCoordinators(AsyncResponse asyncResponse) {
+ final PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (PulsarServerException ex) {
+ asyncResponse.resume(new RestException(ex));
+ return;
+ }
+ Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+ admin.lookups()
+
.lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+ .thenAccept(map -> {
+ map.forEach((topicPartition, brokerServiceUrl) -> {
+ final int coordinatorId =
TopicName.getPartitionIndex(topicPartition);
+ result.put(coordinatorId, new
TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
+ });
+
+ asyncResponse.resume(result.values());
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to list transaction coordinators:
{}",
+ clientAppId(), ex.getMessage(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse,
boolean authoritative,
Integer coordinatorId) {
if (coordinatorId != null) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 37ec04601a2..aa24dbdcc3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -52,6 +52,17 @@ import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class Transactions extends TransactionsBase {
+ @GET
+ @Path("/coordinators")
+ @ApiOperation(value = "List transaction coordinators.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have
admin permission"),
+ @ApiResponse(code = 503, message = "This Broker is not "
+ + "configured with transactionCoordinatorEnabled=true.")})
+ public void listCoordinators(@Suspended final AsyncResponse asyncResponse)
{
+ checkTransactionCoordinatorEnabled();
+ internalListCoordinators(asyncResponse);
+ }
+
@GET
@Path("/coordinatorStats")
@ApiOperation(value = "Get transaction coordinator stats.")
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 563e2c9d758..b57c20bc4f0 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -109,6 +110,18 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
super.internalCleanup();
}
+ @Test(timeOut = 20000)
+ public void testListTransactionCoordinators() throws Exception {
+ initTransaction(4);
+ final List<TransactionCoordinatorInfo> result = admin
+ .transactions().listTransactionCoordinatorsAsync().get();
+ assertEquals(result.size(), 4);
+ final String expectedUrl = pulsar.getBrokerServiceUrl();
+ for (int i = 0; i < 4; i++) {
+ assertEquals(result.get(i).getBrokerServiceUrl(), expectedUrl);
+ }
+ }
+
@Test(timeOut = 20000)
public void testGetTransactionCoordinatorStats() throws Exception {
initTransaction(2);
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index bb3d3121af1..57adf263a57 100644
---
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -18,11 +18,13 @@
*/
package org.apache.pulsar.client.admin;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -34,6 +36,21 @@ import
org.apache.pulsar.common.stats.PositionInPendingAckStats;
public interface Transactions {
+ /**
+ * List transaction coordinators.
+ *
+ * @return the transaction coordinators list.
+ */
+ List<TransactionCoordinatorInfo> listTransactionCoordinators() throws
PulsarAdminException;
+
+ /**
+ * List transaction coordinators.
+ *
+ * @return the future of the transaction coordinators list.
+ */
+ CompletableFuture<List<TransactionCoordinatorInfo>>
listTransactionCoordinatorsAsync();
+
+
/**
* Get transaction metadataStore stats.
*
diff --git
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java
new file mode 100644
index 00000000000..8150f0125aa
--- /dev/null
+++
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
+
+/**
+ * Transaction coordinator information.
+ */
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+@ToString
+public class TransactionCoordinatorInfo {
+ private long id;
+ private String brokerServiceUrl;
+}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index fbc8b3698b4..5693ebc8f60 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
+import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import
org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -48,6 +50,17 @@ public class TransactionsImpl extends BaseResource
implements Transactions {
adminV3Transactions = web.path("/admin/v3/transactions");
}
+ @Override
+ public List<TransactionCoordinatorInfo> listTransactionCoordinators()
throws PulsarAdminException {
+ return sync(() -> listTransactionCoordinatorsAsync());
+ }
+
+ @Override
+ public CompletableFuture<List<TransactionCoordinatorInfo>>
listTransactionCoordinatorsAsync() {
+ WebTarget path = adminV3Transactions.path("coordinators");
+ return asyncGetRequest(path, new
FutureCallback<List<TransactionCoordinatorInfo>>(){});
+ }
+
@Override
public CompletableFuture<TransactionCoordinatorStats>
getCoordinatorStatsByIdAsync(int coordinatorId) {
WebTarget path = adminV3Transactions.path("coordinatorStats");
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a6207f64d42..7b81c2cf449 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2314,6 +2314,10 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("position-stats-in-pending-ack -t test -s
test -l 1 -e 1 -b 1"));
verify(transactions).getPositionStatsInPendingAck("test", "test", 1L,
1L, 1);
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("coordinators-list"));
+ verify(transactions).listTransactionCoordinators();
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index c291cddc772..08ffba1451f 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -23,8 +23,10 @@ import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on transactions")
@@ -219,6 +221,22 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "List transaction coordinators")
+ private class ListTransactionCoordinators extends CliCommand {
+ @Override
+ void run() throws Exception {
+ print(getAdmin()
+ .transactions()
+ .listTransactionCoordinators()
+ .stream()
+ .collect(Collectors.toMap(
+ TransactionCoordinatorInfo::getId,
+ TransactionCoordinatorInfo::getBrokerServiceUrl
+ ))
+ );
+ }
+ }
+
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
@@ -233,6 +251,7 @@ public class CmdTransactions extends CmdBase {
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
jcommander.addCommand("scale-transactionCoordinators", new
ScaleTransactionCoordinators());
jcommander.addCommand("position-stats-in-pending-ack", new
GetPositionStatsInPendingAck());
+ jcommander.addCommand("coordinators-list", new
ListTransactionCoordinators());
}
}