This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c89e1c6a840cc799087685cdd5e2332254386638 Author: Boyang Jerry Peng <[email protected]> AuthorDate: Thu Nov 4 08:24:45 2021 -0700 add additional error handling in auto partition update task MultiTopicsConsumerImpl (#12620) Co-authored-by: Jerry Peng <[email protected]> (cherry picked from commit 11cfbe4f5cf66ac93ec0fb6dd2ea160a0a8bf6cf) --- .../client/impl/MultiTopicsConsumerImpl.java | 33 +++++++++++++--------- .../client/impl/PartitionedProducerImpl.java | 30 +++++++++++--------- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 520e7f3..fa7c4a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1324,28 +1324,35 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { topicName, oldPartitionNumber, currentPartitionNumber); return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions")); } + }).exceptionally(throwable -> { + log.warn("[{}] Failed to get partitions for topic to determine if new partitions are added", throwable); + return null; }); } private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || getState() != State.Ready) { - return; - } + try { + if (timeout.isCancelled() || getState() != State.Ready) { + return; + } - if (log.isDebugEnabled()) { - log.debug("[{}] run partitionsAutoUpdateTimerTask", topic); - } + if (log.isDebugEnabled()) { + log.debug("[{}] run partitionsAutoUpdateTimerTask", topic); + } - // if last auto update not completed yet, do nothing. - if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { - partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet()); + // if last auto update not completed yet, do nothing. + if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { + partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet()); + } + } catch (Throwable th) { + log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th); + } finally { + // schedule the next re-check task + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS); } - - // schedule the next re-check task - partitionsAutoUpdateTimeout = client.timer() - .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS); } }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 4525531..3311050 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -410,22 +410,26 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || getState() != State.Ready) { - return; - } + try { + if (timeout.isCancelled() || getState() != State.Ready) { + return; + } - if (log.isDebugEnabled()) { - log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic); - } + if (log.isDebugEnabled()) { + log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic); + } - // if last auto update not completed yet, do nothing. - if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { - partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic)); + // if last auto update not completed yet, do nothing. + if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) { + partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic)); + } + } catch (Throwable th) { + log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th); + } finally { + // schedule the next re-check task + partitionsAutoUpdateTimeout = client.timer() + .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS); } - - // schedule the next re-check task - partitionsAutoUpdateTimeout = client.timer() - .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS); } };
