Repository: samza Updated Branches: refs/heads/master 525e8e2d8 -> d131d4c55
SAMZA-1775: add some delay before renew under transient EH exception There is no delay at all before we renew the partition. This sometimes lead to spam in the log for the following messages: Received transient exception from EH client. Renew partition receiver for ssp ... Author: Hai Lu <h...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #571 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d131d4c5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d131d4c5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d131d4c5 Branch: refs/heads/master Commit: d131d4c5571480d3d32c2bab09e2431d75a9de87 Parents: 525e8e2 Author: Hai Lu <h...@linkedin.com> Authored: Thu Jul 19 20:23:30 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Jul 19 20:23:30 2018 -0700 ---------------------------------------------------------------------- .../samza/system/eventhub/consumer/EventHubSystemConsumer.java | 6 ++++++ 1 file changed, 6 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d131d4c5/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index 3fa95c2..6b3f344 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -478,6 +478,12 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { LOG.warn( String.format("Received transient exception from EH client. Renew partition receiver for ssp: %s", ssp), throwable); + try { + // Add a fixed delay so that we don't keep retrying when there are long-lasting failures + Thread.sleep(Duration.ofSeconds(2).toMillis()); + } catch (InterruptedException e) { + LOG.warn("Interrupted during sleep before renew", e); + } // Retry creating a receiver since error likely due to timeout renewPartitionReceiver(ssp); return;