Repository: samza
Updated Branches:
  refs/heads/master 7a32d30db -> e63d27c0c


SAMZA-1676: miscellaneous fix and improvement for eventhubs system

Including these changes:
- Log the metadata that we are fetching from the event hubs
- Rename readLatency to consumptionLagMs
- fix the issue that readLatency metric returns negative value

Author: Hai Lu <h...@linkedin.com>

Reviewers: Srini P<spun...@linkedin.com>, Jagadish <jagad...@apache.org>

Closes #484 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e63d27c0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e63d27c0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e63d27c0

Branch: refs/heads/master
Commit: e63d27c0cd08973f35ae5c6967f71ddd3dff31c3
Parents: 7a32d30
Author: Hai Lu <h...@linkedin.com>
Authored: Tue Apr 24 12:26:14 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Apr 24 12:26:14 2018 -0700

----------------------------------------------------------------------
 .../eventhub/SamzaEventHubClientManager.java    |  2 +
 .../eventhub/admin/EventHubSystemAdmin.java     | 47 ++++++++++++++++++--
 .../consumer/EventHubSystemConsumer.java        | 28 +++++++-----
 3 files changed, 63 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
index a884a79..22621f3 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java
@@ -76,6 +76,7 @@ public class SamzaEventHubClientManager implements 
EventHubClientManager {
   @Override
   public void init() {
     String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, 
eventHubNamespace);
+    LOG.info("Initializing SamzaEventHubClientManager for namespace: " + 
eventHubNamespace);
     try {
       ConnectionStringBuilder connectionStringBuilder = new 
ConnectionStringBuilder()
           .setNamespaceName(eventHubNamespace)
@@ -92,6 +93,7 @@ public class SamzaEventHubClientManager implements 
EventHubClientManager {
       LOG.error(msg, e);
       throw new SamzaException(msg, e);
     }
+    LOG.info("SamzaEventHubClientManager initialized for namespace: " + 
eventHubNamespace);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 344a6a8..c86e31a 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.system.eventhub.admin;
 
+import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
 import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
 import java.util.Arrays;
@@ -76,6 +77,33 @@ public class EventHubSystemAdmin implements SystemAdmin {
     return results;
   }
 
+  // EventHubRuntimeInformation does not implement toString()
+  private String printEventHubRuntimeInfo(EventHubRuntimeInformation ehInfo) {
+    if (ehInfo == null) {
+      return "[EventHubRuntimeInformation: null]";
+    }
+    return String.format("[EventHubRuntimeInformation: createAt=%s, 
partitionCount=%d, path=%s]", ehInfo.getCreatedAt(),
+        ehInfo.getPartitionCount(), ehInfo.getPath());
+  }
+
+  // PartitionRuntimeInformation does not implement toString()
+  private String printPartitionRuntimeInfo(PartitionRuntimeInformation 
runtimeInformation) {
+    if (runtimeInformation == null) {
+      return "[PartitionRuntimeInformation: null]";
+    }
+    StringBuilder stringBuilder = new StringBuilder();
+    stringBuilder.append("[PartitionRuntimeInformation:");
+    stringBuilder.append(" 
eventHubPath=").append(runtimeInformation.getEventHubPath());
+    stringBuilder.append(" 
partitionId=").append(runtimeInformation.getPartitionId());
+    stringBuilder.append(" 
lastEnqueuedTimeUtc=").append(runtimeInformation.getLastEnqueuedTimeUtc().toString());
+    stringBuilder.append(" 
lastEnqueuedOffset=").append(runtimeInformation.getLastEnqueuedOffset());
+    // calculate the number of messages in the queue
+    stringBuilder.append(" numMessages=")
+        .append(runtimeInformation.getLastEnqueuedSequenceNumber() - 
runtimeInformation.getBeginSequenceNumber());
+    stringBuilder.append("]");
+    return stringBuilder.toString();
+  }
+
   @Override
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
     Map<String, SystemStreamMetadata> requestedMetadata = new HashMap<>();
@@ -86,11 +114,24 @@ public class EventHubSystemAdmin implements SystemAdmin {
           LOG.debug(String.format("Partition ids for Stream=%s not found", 
streamName));
 
           EventHubClientManager eventHubClientManager = 
getOrCreateStreamEventHubClient(streamName);
-          CompletableFuture<EventHubRuntimeInformation> runtimeInfo = 
eventHubClientManager.getEventHubClient().getRuntimeInformation();
+          EventHubClient ehClient = eventHubClientManager.getEventHubClient();
 
+          CompletableFuture<EventHubRuntimeInformation> runtimeInfo = 
ehClient.getRuntimeInformation();
           long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName);
           EventHubRuntimeInformation ehInfo = runtimeInfo.get(timeoutMs, 
TimeUnit.MILLISECONDS);
-          LOG.info(String.format("Adding partition ids=%s for stream=%s", 
Arrays.toString(ehInfo.getPartitionIds()), streamName));
+          LOG.info(String.format("Adding partition ids=%s for stream=%s. 
EHRuntimetInfo=%s",
+              Arrays.toString(ehInfo.getPartitionIds()), streamName, 
printEventHubRuntimeInfo(ehInfo)));
+
+          try {
+            for (String partitionId : ehInfo.getPartitionIds()) {
+              LOG.info(printPartitionRuntimeInfo(
+                  
ehClient.getPartitionRuntimeInformation(partitionId).get(timeoutMs, 
TimeUnit.MILLISECONDS)));
+            }
+          } catch (Exception e) {
+            // ignore failures as this is just for information logging
+            LOG.warn("Failed to fetch and print partition runtime info from 
EventHubs.", e);
+          }
+
           streamPartitions.put(streamName, ehInfo.getPartitionIds());
         }
         String[] partitionIds = streamPartitions.get(streamName);
@@ -113,7 +154,7 @@ public class EventHubSystemAdmin implements SystemAdmin {
 
   private EventHubClientManager getOrCreateStreamEventHubClient(String 
streamName) {
     if (!eventHubClients.containsKey(streamName)) {
-      LOG.debug(String.format("Creating EventHubClient for Stream=%s", 
streamName));
+      LOG.info(String.format("Creating EventHubClient for Stream=%s", 
streamName));
 
       EventHubClientManager eventHubClientManager = 
eventHubClientManagerFactory
               .getEventHubClientManager(systemName, streamName, 
eventHubConfig);

http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index f00944b..0fd0f2c 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -106,7 +106,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   public static final String END_OF_STREAM = "-2";
   public static final String EVENT_READ_RATE = "eventReadRate";
   public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate";
-  public static final String READ_LATENCY = "readLatency";
+  public static final String CONSUMPTION_LAG_MS = "consumptionLagMs";
   public static final String READ_ERRORS = "readErrors";
   public static final String AGGREGATE = "aggregate";
 
@@ -114,12 +114,12 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
   private static Counter aggEventReadRate = null;
   private static Counter aggEventByteReadRate = null;
-  private static SamzaHistogram aggReadLatency = null;
+  private static SamzaHistogram aggConsumptionLagMs = null;
   private static Counter aggReadErrors = null;
 
   private final Map<String, Counter> eventReadRates;
   private final Map<String, Counter> eventByteReadRates;
-  private final Map<String, SamzaHistogram> readLatencies;
+  private final Map<String, SamzaHistogram> consumptionLagMs;
   private final Map<String, Counter> readErrors;
 
   final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> 
streamPartitionHandlers =
@@ -162,8 +162,8 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
         streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, EVENT_READ_RATE)));
     eventByteReadRates = streamIds.stream()
         .collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, EVENT_BYTE_READ_RATE)));
-    readLatencies = streamIds.stream()
-        .collect(Collectors.toMap(Function.identity(), x -> new 
SamzaHistogram(registry, x, READ_LATENCY)));
+    consumptionLagMs = streamIds.stream()
+        .collect(Collectors.toMap(Function.identity(), x -> new 
SamzaHistogram(registry, x, CONSUMPTION_LAG_MS)));
     readErrors =
         streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> 
registry.newCounter(x, READ_ERRORS)));
 
@@ -172,7 +172,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
       if (aggEventReadRate == null) {
         aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE);
         aggEventByteReadRate = registry.newCounter(AGGREGATE, 
EVENT_BYTE_READ_RATE);
-        aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY);
+        aggConsumptionLagMs = new SamzaHistogram(registry, AGGREGATE, 
CONSUMPTION_LAG_MS);
         aggReadErrors = registry.newCounter(AGGREGATE, READ_ERRORS);
       }
     }
@@ -222,6 +222,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   @Override
   public void start() {
     isStarted = true;
+    LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + 
streamPartitionOffsets.entrySet().size());
     // Create receivers for Event Hubs
     for (Map.Entry<SystemStreamPartition, String> entry : 
streamPartitionOffsets.entrySet()) {
 
@@ -258,7 +259,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
         PartitionReceiveHandler handler =
             new PartitionReceiverHandlerImpl(ssp, 
eventReadRates.get(streamId), eventByteReadRates.get(streamId),
-                readLatencies.get(streamId), readErrors.get(streamId), 
interceptors.getOrDefault(streamId, null),
+                consumptionLagMs.get(streamId), readErrors.get(streamId), 
interceptors.getOrDefault(streamId, null),
                 config.getMaxEventCountPerPoll(systemName));
 
         // Timeout for EventHubClient receive
@@ -274,8 +275,9 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
             String.format("Failed to create receiver for EventHubs: 
namespace=%s, entity=%s, partitionId=%d", namespace,
                 entityPath, partitionId), e);
       }
-      LOG.debug(String.format("Connection successfully started for 
namespace=%s, entity=%s ", namespace, entityPath));
+      LOG.info(String.format("Connection successfully started for 
namespace=%s, entity=%s ", namespace, entityPath));
     }
+    LOG.info("EventHubSystemConsumer started");
   }
 
   @Override
@@ -334,7 +336,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
   @Override
   public void stop() {
-    LOG.debug("Stopping event hub system consumer...");
+    LOG.info("Stopping event hub system consumer...");
     List<CompletableFuture<Void>> futures = new ArrayList<>();
     streamPartitionReceivers.values().forEach((receiver) -> 
futures.add(receiver.close()));
     CompletableFuture<Void> future = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
@@ -422,9 +424,9 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
       eventByteReadRate.inc(eventDataLength);
       aggEventByteReadRate.inc(eventDataLength);
 
-      long latencyMs = Duration.between(Instant.now(), 
event.getSystemProperties().getEnqueuedTime()).toMillis();
+      long latencyMs = 
Duration.between(event.getSystemProperties().getEnqueuedTime(), 
Instant.now()).toMillis();
       readLatency.update(latencyMs);
-      aggReadLatency.update(latencyMs);
+      aggConsumptionLagMs.update(latencyMs);
     }
 
     @Override
@@ -440,12 +442,16 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
           // Only set to transient throwable if there has been no previous 
errors
           eventHubHandlerError.compareAndSet(null, throwable);
 
+          LOG.warn(
+              String.format("Received transient exception from EH client. 
Renew partition receiver for ssp: %s", ssp),
+              throwable);
           // Retry creating a receiver since error likely due to timeout
           renewPartitionReceiver(ssp);
           return;
         }
       }
 
+      LOG.error(String.format("Received non transient exception from EH client 
for ssp: %s", ssp), throwable);
       // Propagate non transient or unknown errors
       eventHubHandlerError.set(throwable);
     }

Reply via email to