Repository: samza
Updated Branches:
  refs/heads/master 525e8e2d8 -> d131d4c55


SAMZA-1775: add some delay before renew under transient EH exception

There is no delay at all before we renew the partition. This sometimes lead to 
spam in the log for the following messages:

Received transient exception from EH client. Renew partition receiver for ssp 
...

Author: Hai Lu <h...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #571 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d131d4c5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d131d4c5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d131d4c5

Branch: refs/heads/master
Commit: d131d4c5571480d3d32c2bab09e2431d75a9de87
Parents: 525e8e2
Author: Hai Lu <h...@linkedin.com>
Authored: Thu Jul 19 20:23:30 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Jul 19 20:23:30 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/consumer/EventHubSystemConsumer.java | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d131d4c5/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 3fa95c2..6b3f344 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -478,6 +478,12 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
           LOG.warn(
               String.format("Received transient exception from EH client. 
Renew partition receiver for ssp: %s", ssp),
               throwable);
+          try {
+            // Add a fixed delay so that we don't keep retrying when there are 
long-lasting failures
+            Thread.sleep(Duration.ofSeconds(2).toMillis());
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted during sleep before renew", e);
+          }
           // Retry creating a receiver since error likely due to timeout
           renewPartitionReceiver(ssp);
           return;

Reply via email to