This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2596edc12779fbc15efb8c099860ca1a09bdde47 Author: fengyubiao <[email protected]> AuthorDate: Wed Apr 30 14:33:57 2025 +0800 [improve][client]Improve transaction log when a TXN command timeout (#24230) (cherry picked from commit 74c3b577c7c78a5f9d72fa28c99db72dda4460b7) --- .../client/impl/TransactionMetaStoreHandler.java | 44 ++++++++++++++++++---- 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index c8c2fa83f94..a1fe78d7290 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -234,7 +234,8 @@ public class TransactionMetaStoreHandler extends HandlerState } long requestId = client.newRequestId(); ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout)); - OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client); + String description = String.format("Create new transaction %s", transactionCoordinatorId); + OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client, description, cnx()); internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); @@ -315,8 +316,10 @@ public class TransactionMetaStoreHandler extends HandlerState long requestId = client.newRequestId(); ByteBuf cmd = Commands.newAddPartitionToTxn( requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), partitions); + String description = String.format("Add partition %s to TXN %s", String.valueOf(partitions), + String.valueOf(txnID)); OpForVoidCallBack op = OpForVoidCallBack - .create(cmd, callback, client); + .create(cmd, callback, client, description, cnx()); internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); @@ -400,7 +403,9 @@ public class TransactionMetaStoreHandler extends HandlerState long requestId = client.newRequestId(); ByteBuf cmd = Commands.newAddSubscriptionToTxn( requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), subscriptionList); - OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client); + String description = String.format("Add subscription %s to TXN %s", toStringSubscriptionList(subscriptionList), + String.valueOf(txnID)); + OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client, description, cnx()); internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); @@ -411,6 +416,17 @@ public class TransactionMetaStoreHandler extends HandlerState return callback; } + private String toStringSubscriptionList(List<Subscription> list) { + if (list == null || list.isEmpty()) { + return "[]"; + } + StringBuilder builder = new StringBuilder("["); + for (Subscription subscription : list) { + builder.append(String.format("%s %s", subscription.getTopic(), subscription.getSubscription())); + } + return builder.append("]").toString(); + } + public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnResponse response) { final boolean hasError = response.hasError(); final ServerError error; @@ -482,7 +498,8 @@ public class TransactionMetaStoreHandler extends HandlerState long requestId = client.newRequestId(); BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action); ByteBuf buf = Commands.serializeWithSize(cmd); - OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client); + String description = String.format("End [%s] TXN %s", String.valueOf(action), String.valueOf(txnID)); + OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client, description, cnx()); internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); timeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId)); @@ -572,13 +589,16 @@ public class TransactionMetaStoreHandler extends HandlerState protected ByteBuf cmd; protected CompletableFuture<T> callback; protected Backoff backoff; + protected String description; + protected ClientCnx clientCnx; abstract void recycle(); } private static class OpForTxnIdCallBack extends OpBase<TxnID> { - static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client) { + static OpForTxnIdCallBack create(ByteBuf cmd, CompletableFuture<TxnID> callback, PulsarClientImpl client, + String description, ClientCnx clientCnx) { OpForTxnIdCallBack op = RECYCLER.get(); op.callback = callback; op.cmd = cmd; @@ -588,6 +608,8 @@ public class TransactionMetaStoreHandler extends HandlerState .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS) .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); + op.description = description; + op.clientCnx = clientCnx; return op; } @@ -600,6 +622,8 @@ public class TransactionMetaStoreHandler extends HandlerState this.backoff = null; this.cmd = null; this.callback = null; + this.description = null; + this.clientCnx = null; recyclerHandle.recycle(this); } @@ -615,7 +639,8 @@ public class TransactionMetaStoreHandler extends HandlerState private static class OpForVoidCallBack extends OpBase<Void> { - static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client) { + static OpForVoidCallBack create(ByteBuf cmd, CompletableFuture<Void> callback, PulsarClientImpl client, + String description, ClientCnx clientCnx) { OpForVoidCallBack op = RECYCLER.get(); op.callback = callback; op.cmd = cmd; @@ -625,6 +650,8 @@ public class TransactionMetaStoreHandler extends HandlerState .setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, TimeUnit.NANOSECONDS) .setMandatoryStop(0, TimeUnit.MILLISECONDS) .create(); + op.description = description; + op.clientCnx = clientCnx; return op; } @@ -637,6 +664,8 @@ public class TransactionMetaStoreHandler extends HandlerState this.backoff = null; this.cmd = null; this.callback = null; + this.description = null; + this.clientCnx = null; recyclerHandle.recycle(this); } @@ -744,7 +773,8 @@ public class TransactionMetaStoreHandler extends HandlerState OpBase<?> op = pendingRequests.remove(lastPolled.requestId); if (op != null && !op.callback.isDone()) { op.callback.completeExceptionally(new PulsarClientException.TimeoutException( - "Could not get response from transaction meta store within given timeout.")); + String.format("%s failed due to timeout. connection: %s. pending-queue: %s", + op.description, op.clientCnx, pendingRequests.size()))); if (LOG.isDebugEnabled()) { LOG.debug("Transaction coordinator request {} is timeout.", lastPolled.requestId); }
