yangl commented on a change in pull request #10797:
URL: https://github.com/apache/pulsar/pull/10797#discussion_r644702804



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
##########
@@ -66,7 +67,7 @@ public static void generate(PulsarService pulsar, 
SimpleTextOutputStream stream,
                                     generateManageLedgerStats(managedLedger,
                                             stream, cluster, namespace, name, 
subscription.getName());
                                 } catch (Exception e) {
-                                    throw new IOException("Transaction pending 
ack generate managedLedgerStats fail!");
+                                    log.warn("Transaction pending ack generate 
managedLedgerStats fail!", e);

Review comment:
        subscription.pendingAckHandle instanceof `PendingAckHandleDisabled`, 
not PendingAckHandleImpl
   
   
![image](https://user-images.githubusercontent.com/231353/120634250-9eea5900-c49d-11eb-93fc-2852d506b90e.png)
   
   PersistentSubscription.java
   ```java
       public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
           if (this.pendingAckHandle instanceof PendingAckHandleImpl) {
               return ((PendingAckHandleImpl) 
this.pendingAckHandle).getStoreManageLedger();
           } else {
               return FutureUtil.failedFuture(new NotAllowedException("Pending 
ack handle don't use managedLedger!"));
           }
       }
   ```
   `return FutureUtil.failedFuture(new NotAllowedException("Pending ack handle 
don't use managedLedger!"));`
   
   PersistentSubscription.java
   ```java
   if 
(topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
                   && !checkTopicIsEventsNames(topicName)
                   && 
!topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
                   && 
!topicName.startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
                   && 
!topicName.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
               this.pendingAckHandle = new PendingAckHandleImpl(this);
           } else {
               this.pendingAckHandle = new PendingAckHandleDisabled();
           }
   ```
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java
##########
@@ -66,7 +67,7 @@ public static void generate(PulsarService pulsar, 
SimpleTextOutputStream stream,
                                     generateManageLedgerStats(managedLedger,
                                             stream, cluster, namespace, name, 
subscription.getName());
                                 } catch (Exception e) {
-                                    throw new IOException("Transaction pending 
ack generate managedLedgerStats fail!");
+                                    log.warn("Transaction pending ack generate 
managedLedgerStats fail!", e);

Review comment:
       or add the `pendingAckHandle instanceof` logic, like this
   
   ```java
   PersistentSubscription sub = ((PersistentSubscription) subscription);
   Field pendingAckHandleField = 
PersistentSubscription.class.getDeclaredField("pendingAckHandle");
   pendingAckHandleField.setAccessible(true);
   PendingAckHandle pendingAckHandle =
           (PendingAckHandle) pendingAckHandleField.get(sub);
   if (pendingAckHandle instanceof PendingAckHandleImpl){
       generateManageLedgerStats(managedLedger,
               stream, cluster, namespace, name, subscription.getName());
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to