This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c89e1c6a840cc799087685cdd5e2332254386638
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Nov 4 08:24:45 2021 -0700

    add additional error handling in auto partition update task 
MultiTopicsConsumerImpl (#12620)
    
    Co-authored-by: Jerry Peng <[email protected]>
    (cherry picked from commit 11cfbe4f5cf66ac93ec0fb6dd2ea160a0a8bf6cf)
---
 .../client/impl/MultiTopicsConsumerImpl.java       | 33 +++++++++++++---------
 .../client/impl/PartitionedProducerImpl.java       | 30 +++++++++++---------
 2 files changed, 37 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 520e7f3..fa7c4a2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1324,28 +1324,35 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
                     topicName, oldPartitionNumber, currentPartitionNumber);
                 return FutureUtil.failedFuture(new NotSupportedException("not 
support shrink topic partitions"));
             }
+        }).exceptionally(throwable -> {
+            log.warn("[{}] Failed to get partitions for topic to determine if 
new partitions are added", throwable);
+            return null;
         });
     }
 
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled() || getState() != State.Ready) {
-                return;
-            }
+            try {
+                if (timeout.isCancelled() || getState() != State.Ready) {
+                    return;
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
-            }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
+                }
 
-            // if last auto update not completed yet, do nothing.
-            if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
-                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+                // if last auto update not completed yet, do nothing.
+                if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
+                    partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+                }
+            } catch (Throwable th) {
+                log.warn("Encountered error in partition auto update timer 
task for multi-topic consumer. Another task will be scheduled.", th);
+            } finally {
+                // schedule the next re-check task
+                partitionsAutoUpdateTimeout = client.timer()
+                        .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
             }
-
-            // schedule the next re-check task
-            partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 4525531..3311050 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -410,22 +410,26 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled() || getState() != State.Ready) {
-                return;
-            }
+            try {
+                if (timeout.isCancelled() || getState() != State.Ready) {
+                    return;
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] run partitionsAutoUpdateTimerTask for 
partitioned producer", topic);
-            }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] run partitionsAutoUpdateTimerTask for 
partitioned producer", topic);
+                }
 
-            // if last auto update not completed yet, do nothing.
-            if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
-                partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+                // if last auto update not completed yet, do nothing.
+                if (partitionsAutoUpdateFuture == null || 
partitionsAutoUpdateFuture.isDone()) {
+                    partitionsAutoUpdateFuture = 
topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+                }
+            } catch (Throwable th) {
+                log.warn("Encountered error in partition auto update timer 
task for partition producer. Another task will be scheduled.", th);
+            } finally {
+                // schedule the next re-check task
+                partitionsAutoUpdateTimeout = client.timer()
+                        .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
             }
-
-            // schedule the next re-check task
-            partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, 
conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };
 

Reply via email to