Repository: samza Updated Branches: refs/heads/master 7a32d30db -> e63d27c0c
SAMZA-1676: miscellaneous fix and improvement for eventhubs system Including these changes: - Log the metadata that we are fetching from the event hubs - Rename readLatency to consumptionLagMs - fix the issue that readLatency metric returns negative value Author: Hai Lu <h...@linkedin.com> Reviewers: Srini P<spun...@linkedin.com>, Jagadish <jagad...@apache.org> Closes #484 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e63d27c0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e63d27c0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e63d27c0 Branch: refs/heads/master Commit: e63d27c0cd08973f35ae5c6967f71ddd3dff31c3 Parents: 7a32d30 Author: Hai Lu <h...@linkedin.com> Authored: Tue Apr 24 12:26:14 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Apr 24 12:26:14 2018 -0700 ---------------------------------------------------------------------- .../eventhub/SamzaEventHubClientManager.java | 2 + .../eventhub/admin/EventHubSystemAdmin.java | 47 ++++++++++++++++++-- .../consumer/EventHubSystemConsumer.java | 28 +++++++----- 3 files changed, 63 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java index a884a79..22621f3 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java @@ -76,6 +76,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager { @Override public void init() { String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace); + LOG.info("Initializing SamzaEventHubClientManager for namespace: " + eventHubNamespace); try { ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder() .setNamespaceName(eventHubNamespace) @@ -92,6 +93,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager { LOG.error(msg, e); throw new SamzaException(msg, e); } + LOG.info("SamzaEventHubClientManager initialized for namespace: " + eventHubNamespace); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 344a6a8..c86e31a 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -19,6 +19,7 @@ package org.apache.samza.system.eventhub.admin; +import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; import java.util.Arrays; @@ -76,6 +77,33 @@ public class EventHubSystemAdmin implements SystemAdmin { return results; } + // EventHubRuntimeInformation does not implement toString() + private String printEventHubRuntimeInfo(EventHubRuntimeInformation ehInfo) { + if (ehInfo == null) { + return "[EventHubRuntimeInformation: null]"; + } + return String.format("[EventHubRuntimeInformation: createAt=%s, partitionCount=%d, path=%s]", ehInfo.getCreatedAt(), + ehInfo.getPartitionCount(), ehInfo.getPath()); + } + + // PartitionRuntimeInformation does not implement toString() + private String printPartitionRuntimeInfo(PartitionRuntimeInformation runtimeInformation) { + if (runtimeInformation == null) { + return "[PartitionRuntimeInformation: null]"; + } + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("[PartitionRuntimeInformation:"); + stringBuilder.append(" eventHubPath=").append(runtimeInformation.getEventHubPath()); + stringBuilder.append(" partitionId=").append(runtimeInformation.getPartitionId()); + stringBuilder.append(" lastEnqueuedTimeUtc=").append(runtimeInformation.getLastEnqueuedTimeUtc().toString()); + stringBuilder.append(" lastEnqueuedOffset=").append(runtimeInformation.getLastEnqueuedOffset()); + // calculate the number of messages in the queue + stringBuilder.append(" numMessages=") + .append(runtimeInformation.getLastEnqueuedSequenceNumber() - runtimeInformation.getBeginSequenceNumber()); + stringBuilder.append("]"); + return stringBuilder.toString(); + } + @Override public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) { Map<String, SystemStreamMetadata> requestedMetadata = new HashMap<>(); @@ -86,11 +114,24 @@ public class EventHubSystemAdmin implements SystemAdmin { LOG.debug(String.format("Partition ids for Stream=%s not found", streamName)); EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName); - CompletableFuture<EventHubRuntimeInformation> runtimeInfo = eventHubClientManager.getEventHubClient().getRuntimeInformation(); + EventHubClient ehClient = eventHubClientManager.getEventHubClient(); + CompletableFuture<EventHubRuntimeInformation> runtimeInfo = ehClient.getRuntimeInformation(); long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName); EventHubRuntimeInformation ehInfo = runtimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS); - LOG.info(String.format("Adding partition ids=%s for stream=%s", Arrays.toString(ehInfo.getPartitionIds()), streamName)); + LOG.info(String.format("Adding partition ids=%s for stream=%s. EHRuntimetInfo=%s", + Arrays.toString(ehInfo.getPartitionIds()), streamName, printEventHubRuntimeInfo(ehInfo))); + + try { + for (String partitionId : ehInfo.getPartitionIds()) { + LOG.info(printPartitionRuntimeInfo( + ehClient.getPartitionRuntimeInformation(partitionId).get(timeoutMs, TimeUnit.MILLISECONDS))); + } + } catch (Exception e) { + // ignore failures as this is just for information logging + LOG.warn("Failed to fetch and print partition runtime info from EventHubs.", e); + } + streamPartitions.put(streamName, ehInfo.getPartitionIds()); } String[] partitionIds = streamPartitions.get(streamName); @@ -113,7 +154,7 @@ public class EventHubSystemAdmin implements SystemAdmin { private EventHubClientManager getOrCreateStreamEventHubClient(String streamName) { if (!eventHubClients.containsKey(streamName)) { - LOG.debug(String.format("Creating EventHubClient for Stream=%s", streamName)); + LOG.info(String.format("Creating EventHubClient for Stream=%s", streamName)); EventHubClientManager eventHubClientManager = eventHubClientManagerFactory .getEventHubClientManager(systemName, streamName, eventHubConfig); http://git-wip-us.apache.org/repos/asf/samza/blob/e63d27c0/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index f00944b..0fd0f2c 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -106,7 +106,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { public static final String END_OF_STREAM = "-2"; public static final String EVENT_READ_RATE = "eventReadRate"; public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate"; - public static final String READ_LATENCY = "readLatency"; + public static final String CONSUMPTION_LAG_MS = "consumptionLagMs"; public static final String READ_ERRORS = "readErrors"; public static final String AGGREGATE = "aggregate"; @@ -114,12 +114,12 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { private static Counter aggEventReadRate = null; private static Counter aggEventByteReadRate = null; - private static SamzaHistogram aggReadLatency = null; + private static SamzaHistogram aggConsumptionLagMs = null; private static Counter aggReadErrors = null; private final Map<String, Counter> eventReadRates; private final Map<String, Counter> eventByteReadRates; - private final Map<String, SamzaHistogram> readLatencies; + private final Map<String, SamzaHistogram> consumptionLagMs; private final Map<String, Counter> readErrors; final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = @@ -162,8 +162,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE))); eventByteReadRates = streamIds.stream() .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE))); - readLatencies = streamIds.stream() - .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY))); + consumptionLagMs = streamIds.stream() + .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, CONSUMPTION_LAG_MS))); readErrors = streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); @@ -172,7 +172,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { if (aggEventReadRate == null) { aggEventReadRate = registry.newCounter(AGGREGATE, EVENT_READ_RATE); aggEventByteReadRate = registry.newCounter(AGGREGATE, EVENT_BYTE_READ_RATE); - aggReadLatency = new SamzaHistogram(registry, AGGREGATE, READ_LATENCY); + aggConsumptionLagMs = new SamzaHistogram(registry, AGGREGATE, CONSUMPTION_LAG_MS); aggReadErrors = registry.newCounter(AGGREGATE, READ_ERRORS); } } @@ -222,6 +222,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { @Override public void start() { isStarted = true; + LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + streamPartitionOffsets.entrySet().size()); // Create receivers for Event Hubs for (Map.Entry<SystemStreamPartition, String> entry : streamPartitionOffsets.entrySet()) { @@ -258,7 +259,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), eventByteReadRates.get(streamId), - readLatencies.get(streamId), readErrors.get(streamId), interceptors.getOrDefault(streamId, null), + consumptionLagMs.get(streamId), readErrors.get(streamId), interceptors.getOrDefault(streamId, null), config.getMaxEventCountPerPoll(systemName)); // Timeout for EventHubClient receive @@ -274,8 +275,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", namespace, entityPath, partitionId), e); } - LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath)); + LOG.info(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath)); } + LOG.info("EventHubSystemConsumer started"); } @Override @@ -334,7 +336,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { @Override public void stop() { - LOG.debug("Stopping event hub system consumer..."); + LOG.info("Stopping event hub system consumer..."); List<CompletableFuture<Void>> futures = new ArrayList<>(); streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close())); CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); @@ -422,9 +424,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { eventByteReadRate.inc(eventDataLength); aggEventByteReadRate.inc(eventDataLength); - long latencyMs = Duration.between(Instant.now(), event.getSystemProperties().getEnqueuedTime()).toMillis(); + long latencyMs = Duration.between(event.getSystemProperties().getEnqueuedTime(), Instant.now()).toMillis(); readLatency.update(latencyMs); - aggReadLatency.update(latencyMs); + aggConsumptionLagMs.update(latencyMs); } @Override @@ -440,12 +442,16 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { // Only set to transient throwable if there has been no previous errors eventHubHandlerError.compareAndSet(null, throwable); + LOG.warn( + String.format("Received transient exception from EH client. Renew partition receiver for ssp: %s", ssp), + throwable); // Retry creating a receiver since error likely due to timeout renewPartitionReceiver(ssp); return; } } + LOG.error(String.format("Received non transient exception from EH client for ssp: %s", ssp), throwable); // Propagate non transient or unknown errors eventHubHandlerError.set(throwable); }