Repository: samza Updated Branches: refs/heads/master 8e000f398 -> fda1e37d0
Upgrade to latest event hub version (1.0.1) * Upgrade to the latest event hub version 1.0.1 * Adding configs for prefetchCount and maxEventPerPoll * Fix the high cpu usage issue in SamzaHistogram * Fixing a race condition in event hub system producer where the future was getting removed while it was being checked for completion resulting in NPE. Author: Srinivasulu Punuru <spun...@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org> Closes #467 from srinipunuru/upgrade-eh-1.0.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fda1e37d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fda1e37d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fda1e37d Branch: refs/heads/master Commit: fda1e37d0862493cb59ed0907c597a2dc973ef6f Parents: 8e000f3 Author: Srinivasulu Punuru <spun...@linkedin.com> Authored: Tue Apr 10 15:39:21 2018 -0700 Committer: Prateek Maheshwari <pmahe...@linkedin.com> Committed: Tue Apr 10 15:39:21 2018 -0700 ---------------------------------------------------------------------- build.gradle | 2 +- .../versioned/jobs/configuration-table.html | 28 ++- .../eventhub/EventHubClientManagerFactory.java | 3 +- .../samza/system/eventhub/EventHubConfig.java | 38 ++++ .../eventhub/SamzaEventHubClientManager.java | 37 ++-- .../eventhub/admin/EventHubSystemAdmin.java | 8 +- .../consumer/EventHubSystemConsumer.java | 179 ++++++++++--------- .../system/eventhub/metrics/SamzaHistogram.java | 69 ++++--- .../eventhub/producer/AsyncSystemProducer.java | 1 - .../producer/EventHubSystemProducer.java | 7 +- .../samza/system/eventhub/MockEventData.java | 21 ++- .../MockEventHubClientManagerFactory.java | 25 ++- .../system/eventhub/TestMetricsRegistry.java | 7 +- .../consumer/TestEventHubSystemConsumer.java | 6 +- .../producer/ITestEventHubSystemProducer.java | 10 +- .../producer/TestEventHubSystemProducer.java | 4 +- .../samza/tools/EventHubConsoleConsumer.java | 64 ++++--- 17 files changed, 322 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index d96ec96..2f27a03 100644 --- a/build.gradle +++ b/build.gradle @@ -202,7 +202,7 @@ project(':samza-azure') { dependencies { compile "com.microsoft.azure:azure-storage:5.3.1" - compile "com.microsoft.azure:azure-eventhubs:0.14.5" + compile "com.microsoft.azure:azure-eventhubs:1.0.1" compile "com.fasterxml.jackson.core:jackson-core:2.8.8" compile "io.dropwizard.metrics:metrics-core:3.1.2" compile project(':samza-api') http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 5c41596..4495cb1 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -2279,30 +2279,48 @@ </tr> <tr> - <td class="property" id="eventhub-stream-namespace">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td> + <td class="property" id="eventhub-stream-namespace">streams.<span class="stream">stream-id</span>.<br>eventhubs.namespace</td> <td class="default"></td> <td class="description">Namespace of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> </tr> <tr> - <td class="property" id="eventhub-stream-entity">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td> + <td class="property" id="eventhub-stream-entity">streams.<span class="stream">stream-id</span>.<br>eventhubs.entitypath</td> <td class="default"></td> <td class="description">Entity of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> </tr> <tr> - <td class="property" id="eventhub-stream-sas-keyname">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td> + <td class="property" id="eventhub-stream-sas-keyname">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.keyname</td> <td class="default"></td> <td class="description">SAS Keyname of the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> </tr> <tr> - <td class="property" id="eventhub-stream-sas-token">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td> + <td class="property" id="eventhub-stream-sas-token">sensitive.streams.<span class="stream">stream-id</span>.<br>eventhubs.sas.token</td> <td class="default"></td> <td class="description">SAS Token the associated <span class="stream">stream-ids</span>. Required to access the Eventhubs entity per stream.</td> </tr> <tr> + <td class="property" id="eventhub-client-threads">streams.<span class="system">stream-name</span>.<br>eventhubs.numClientThreads</td> + <td class="default">10</td> + <td class="description">Number of threads in thread pool that will be used by the EventHubClient. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_hub_client.create">here </a> for more details.</td> + </tr> + + <tr> + <td class="property" id="eventhub-prefetch-count">systems.<span class="system">system-name</span>.<br>eventhubs.prefetchCount</td> + <td class="default">999</td> + <td class="description">Number of events that EventHub client should prefetch from the server. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receiver.setprefetchcount">here </a> for more details.</td> + </tr> + + <tr> + <td class="property" id="eventhub-max-event-count">systems.<span class="system">system-name</span>.<br>eventhubs.maxEventCountPerPoll</td> + <td class="default">50</td> + <td class="description">Maximum number of events that EventHub client can return in a receive call. See <a href="https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._partition_receive_handler.getmaxeventcount#com_microsoft_azure_eventhubs__partition_receive_handler_getMaxEventCount__">here </a> for more details.</td> + </tr> + + <tr> <td class="property" id="eventhub-runtime-timeout">systems.<span class="system">system-name</span>.<br>eventhubs.runtime.info.timeout</td> <td class="default">60000</td> <td class="description">Timeout for fetching the runtime metadata from an Eventhub entity on startup in millis.</td> @@ -2333,7 +2351,7 @@ </tr> <tr> - <td class="property" id="eventhub-consumer-group">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td> + <td class="property" id="eventhub-consumer-group">streams.<span class="stream">stream-id</span>.<br>eventhubs.consumer.group</td> <td class="default"><code>$Default</code></td> <td class="description"> Consumer only config. Set the consumer group from the upstream Eventhub that the consumer is part of. Defaults to the <code>$Default</code> group that is initially present in all Eventhub entities (unless removed) http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java index 0578a50..879a07f 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManagerFactory.java @@ -26,7 +26,8 @@ public class EventHubClientManagerFactory { String entityPath = config.getStreamEntityPath(systemName, streamName); String sasKeyName = config.getStreamSasKeyName(systemName, streamName); String sasToken = config.getStreamSasToken(systemName, streamName); + int numClientThreads = config.getNumClientThreads(systemName); - return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken); + return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken, numClientThreads); } } http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java index 7df92c0..6639dd8 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java @@ -20,6 +20,7 @@ package org.apache.samza.system.eventhub; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; @@ -49,6 +50,15 @@ public class EventHubConfig extends MapConfig { public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group"; public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; + public static final String CONFIG_SYSTEM_NUM_CLIENT_THREADS = "streams.%s.eventhubs.numClientThreads"; + public static final int DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS = 10; + + public static final String CONFIG_PREFETCH_COUNT = "systems.%s.eventhubs.prefetchCount"; + public static final int DEFAULT_CONFIG_PREFETCH_COUNT = PartitionReceiver.DEFAULT_PREFETCH_COUNT; + + public static final String CONFIG_MAX_EVENT_COUNT_PER_POLL = "systems.%s.eventhubs.maxEventCountPerPoll"; + public static final int DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL = 50; + public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method"; public static final String DEFAULT_CONFIG_PRODUCER_PARTITION_METHOD = EventHubSystemProducer .PartitioningMethod.EVENT_HUB_HASHING.name(); @@ -144,6 +154,34 @@ public class EventHubConfig extends MapConfig { } /** + * Get the number of client threads, This is used to create the ThreadPool executor that is passed to the + * {@link EventHubClient#create} + * @param systemName Name of the system. + * @return Num of client threads to use. + */ + public Integer getNumClientThreads(String systemName) { + return getInt(String.format(CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), DEFAULT_CONFIG_SYSTEM_NUM_CLIENT_THREADS); + } + + /** + * Get the max event count returned per poll + * @param systemName Name of the system + * @return Max number of events returned per poll + */ + public Integer getMaxEventCountPerPoll(String systemName) { + return getInt(String.format(CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), DEFAULT_CONFIG_MAX_EVENT_COUNT_PER_POLL); + } + + /** + * Get the per partition prefetch count for the event hub client + * @param systemName Name of the system. + * @return Per partition Prefetch count for the event hub client. + */ + public Integer getPrefetchCount(String systemName) { + return getInt(String.format(CONFIG_PREFETCH_COUNT, systemName), DEFAULT_CONFIG_PREFETCH_COUNT); + } + + /** * Get the EventHubs max Message size * * @param systemName name of the system http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java index 977e252..a884a79 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/SamzaEventHubClientManager.java @@ -19,12 +19,15 @@ package org.apache.samza.system.eventhub; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.servicebus.ClientConstants; -import com.microsoft.azure.servicebus.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.RetryExponential; -import com.microsoft.azure.servicebus.RetryPolicy; -import com.microsoft.azure.servicebus.ServiceBusException; +import com.microsoft.azure.eventhubs.impl.ClientConstants; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.impl.RetryExponential; +import com.microsoft.azure.eventhubs.RetryPolicy; +import com.microsoft.azure.eventhubs.EventHubException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.samza.SamzaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +46,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager { private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000); private static final int MAX_RETRY_COUNT = 100; private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY"; + private final int numClientThreads; private EventHubClient eventHubClient; @@ -51,30 +55,38 @@ public class SamzaEventHubClientManager implements EventHubClientManager { private final String sasKeyName; private final String sasKey; private final RetryPolicy retryPolicy; + private ExecutorService eventHubClientExecutor; - public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey) { + public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey, + Integer numClientThreads) { this(eventHubNamespace, entityPath, sasKeyName, sasKey, - new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY)); + new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, MAX_RETRY_COUNT, SAMZA_EVENTHUB_RETRY), numClientThreads); } public SamzaEventHubClientManager(String eventHubNamespace, String entityPath, String sasKeyName, String sasKey, - RetryPolicy retryPolicy) { + RetryPolicy retryPolicy, int numClientThreads) { this.eventHubNamespace = eventHubNamespace; this.entityPath = entityPath; this.sasKeyName = sasKeyName; this.sasKey = sasKey; this.retryPolicy = retryPolicy; + this.numClientThreads = numClientThreads; } @Override public void init() { String remoteHost = String.format(EVENTHUB_REMOTE_HOST_FORMAT, eventHubNamespace); try { - ConnectionStringBuilder connectionStringBuilder = - new ConnectionStringBuilder(eventHubNamespace, entityPath, sasKeyName, sasKey); + ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder() + .setNamespaceName(eventHubNamespace) + .setEventHubName(entityPath) + .setSasKeyName(sasKeyName) + .setSasKey(sasKey); - eventHubClient = EventHubClient.createFromConnectionStringSync(connectionStringBuilder.toString(), retryPolicy); - } catch (IOException | ServiceBusException e) { + ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d"); + eventHubClientExecutor = Executors.newFixedThreadPool(numClientThreads, threadFactoryBuilder.build()); + eventHubClient = EventHubClient.createSync(connectionStringBuilder.toString(), retryPolicy, eventHubClientExecutor); + } catch (IOException | EventHubException e) { String msg = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d", entityPath, remoteHost, ClientConstants.AMQPS_PORT); LOG.error(msg, e); @@ -92,6 +104,7 @@ public class SamzaEventHubClientManager implements EventHubClientManager { try { if (timeoutMS == EventHubClientManager.BLOCK_UNTIL_CLOSE) { eventHubClient.closeSync(); + eventHubClientExecutor.shutdown(); } else { CompletableFuture<Void> future = eventHubClient.close(); future.get(timeoutMS, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java index 5564747..acb1775 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java @@ -19,8 +19,8 @@ package org.apache.samza.system.eventhub.admin; -import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation; import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; +import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; @@ -144,10 +144,10 @@ public class EventHubSystemAdmin implements SystemAdmin { private Map<Partition, SystemStreamPartitionMetadata> getPartitionMetadata(String streamName, String[] partitionIds) { EventHubClientManager eventHubClientManager = getOrCreateStreamEventHubClient(streamName); Map<Partition, SystemStreamPartitionMetadata> sspMetadataMap = new HashMap<>(); - Map<String, CompletableFuture<EventHubPartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>(); + Map<String, CompletableFuture<PartitionRuntimeInformation>> ehRuntimeInfos = new HashMap<>(); for (String partition : partitionIds) { - CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager + CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfo = eventHubClientManager .getEventHubClient() .getPartitionRuntimeInformation(partition); @@ -157,7 +157,7 @@ public class EventHubSystemAdmin implements SystemAdmin { ehRuntimeInfos.forEach((partitionId, ehPartitionRuntimeInfo) -> { try { long timeoutMs = eventHubConfig.getRuntimeInfoWaitTimeMS(systemName); - EventHubPartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS); + PartitionRuntimeInformation ehPartitionInfo = ehPartitionRuntimeInfo.get(timeoutMs, TimeUnit.MILLISECONDS); // Set offsets String startingOffset = EventHubSystemConsumer.START_OF_STREAM; http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index 90c73dc..f00944b 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -1,45 +1,31 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.eventhub.consumer; import com.microsoft.azure.eventhubs.EventData; -import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; -import com.microsoft.azure.servicebus.ServiceBusException; -import org.apache.samza.SamzaException; -import org.apache.samza.metrics.Counter; -import org.apache.samza.metrics.MetricsRegistry; -import org.apache.samza.system.IncomingMessageEnvelope; -import org.apache.samza.system.SystemStreamPartition; -import org.apache.samza.system.eventhub.EventHubClientManagerFactory; -import org.apache.samza.system.eventhub.EventHubClientManager; -import org.apache.samza.system.eventhub.EventHubConfig; -import org.apache.samza.system.eventhub.Interceptor; -import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin; -import org.apache.samza.system.eventhub.metrics.SamzaHistogram; -import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; -import org.apache.samza.util.BlockingEnvelopeMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; +import com.microsoft.azure.eventhubs.impl.ClientConstants; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -51,11 +37,26 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.system.eventhub.EventHubClientManager; +import org.apache.samza.system.eventhub.EventHubClientManagerFactory; +import org.apache.samza.system.eventhub.EventHubConfig; +import org.apache.samza.system.eventhub.Interceptor; +import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin; +import org.apache.samza.system.eventhub.metrics.SamzaHistogram; +import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; +import org.apache.samza.util.BlockingEnvelopeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -96,13 +97,12 @@ import java.util.stream.Collectors; */ public class EventHubSystemConsumer extends BlockingEnvelopeMap { private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class); - private static final int MAX_EVENT_COUNT_PER_PARTITION_POLL = 50; // Overall timeout for EventHubClient exponential backoff policy private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L); private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis(); - public static final String START_OF_STREAM = PartitionReceiver.START_OF_STREAM; // -1 + public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1 public static final String END_OF_STREAM = "-2"; public static final String EVENT_READ_RATE = "eventReadRate"; public static final String EVENT_BYTE_READ_RATE = "eventByteReadRate"; @@ -122,11 +122,14 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { private final Map<String, SamzaHistogram> readLatencies; private final Map<String, Counter> readErrors; - final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = new ConcurrentHashMap<>(); + final ConcurrentHashMap<SystemStreamPartition, PartitionReceiveHandler> streamPartitionHandlers = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<SystemStreamPartition, PartitionReceiver> streamPartitionReceivers = + new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, EventHubClientManager> streamEventHubManagers = new ConcurrentHashMap<>(); private final ConcurrentHashMap<SystemStreamPartition, String> streamPartitionOffsets = new ConcurrentHashMap<>(); private final Map<String, Interceptor> interceptors; + private final Integer prefetchCount; private boolean isStarted = false; private final EventHubConfig config; private final String systemName; @@ -135,8 +138,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null); public EventHubSystemConsumer(EventHubConfig config, String systemName, - EventHubClientManagerFactory eventHubClientManagerFactory, - Map<String, Interceptor> interceptors, MetricsRegistry registry) { + EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors, + MetricsRegistry registry) { super(registry, System::currentTimeMillis); this.config = config; @@ -145,21 +148,24 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { List<String> streamIds = config.getStreams(systemName); // Create and initiate connections to Event Hubs for (String streamId : streamIds) { - EventHubClientManager eventHubClientManager = eventHubClientManagerFactory - .getEventHubClientManager(systemName, streamId, config); + EventHubClientManager eventHubClientManager = + eventHubClientManagerFactory.getEventHubClientManager(systemName, streamId, config); streamEventHubManagers.put(streamId, eventHubClientManager); eventHubClientManager.init(); } + prefetchCount = config.getPrefetchCount(systemName); + + // Initiate metrics - eventReadRates = streamIds.stream() - .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE))); + eventReadRates = + streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_READ_RATE))); eventByteReadRates = streamIds.stream() - .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE))); + .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, EVENT_BYTE_READ_RATE))); readLatencies = streamIds.stream() - .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY))); - readErrors = streamIds.stream() - .collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); + .collect(Collectors.toMap(Function.identity(), x -> new SamzaHistogram(registry, x, READ_LATENCY))); + readErrors = + streamIds.stream().collect(Collectors.toMap(Function.identity(), x -> registry.newCounter(x, READ_ERRORS))); // Locking to ensure that these aggregated metrics will be created only once across multiple system consumers. synchronized (AGGREGATE_METRICS_LOCK) { @@ -183,7 +189,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { if (streamPartitionOffsets.containsKey(systemStreamPartition)) { // Only update if new offset is lower than previous offset - if (END_OF_STREAM.equals(offset)) return; + if (END_OF_STREAM.equals(offset)) { + return; + } String prevOffset = streamPartitionOffsets.get(systemStreamPartition); if (!END_OF_STREAM.equals(prevOffset) && EventHubSystemAdmin.compareOffsets(offset, prevOffset) > -1) { return; @@ -192,20 +200,20 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { streamPartitionOffsets.put(systemStreamPartition, offset); } - private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName, Integer partitionId) { - CompletableFuture<EventHubPartitionRuntimeInformation> partitionRuntimeInfoFuture = eventHubClientManager - .getEventHubClient() - .getPartitionRuntimeInformation(partitionId.toString()); + private String getNewestEventHubOffset(EventHubClientManager eventHubClientManager, String streamName, + Integer partitionId) { + CompletableFuture<PartitionRuntimeInformation> partitionRuntimeInfoFuture = + eventHubClientManager.getEventHubClient().getPartitionRuntimeInformation(partitionId.toString()); try { long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName); - EventHubPartitionRuntimeInformation partitionRuntimeInformation = partitionRuntimeInfoFuture - .get(timeoutMs, TimeUnit.MILLISECONDS); + PartitionRuntimeInformation partitionRuntimeInformation = + partitionRuntimeInfoFuture.get(timeoutMs, TimeUnit.MILLISECONDS); return partitionRuntimeInformation.getLastEnqueuedOffset(); } catch (InterruptedException | ExecutionException | TimeoutException e) { - String msg = String.format( - "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s", + String msg = + String.format("Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s", systemName, streamName, partitionId); throw new SamzaException(msg); } @@ -235,21 +243,23 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { // If the offset is greater than the newest offset, use the use current Instant as // offset to fetch in Eventhub. receiver = eventHubClientManager.getEventHubClient() - .createReceiverSync(consumerGroup, partitionId.toString(), Instant.now()); + .createReceiverSync(consumerGroup, partitionId.toString(), EventPosition.fromEnqueuedTime(Instant.now())); } else { // If the offset is less or equal to the newest offset in the system, it can be // used as the starting offset to receive from. EventHub will return the first // message AFTER the offset that was specified in the fetch request. // If no such offset exists Eventhub will return an error. receiver = eventHubClientManager.getEventHubClient() - .createReceiverSync(consumerGroup, partitionId.toString(), offset, - !offset.equals(EventHubSystemConsumer.START_OF_STREAM)); + .createReceiverSync(consumerGroup, partitionId.toString(), + EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM))); } - PartitionReceiveHandler handler = new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), - eventByteReadRates.get(streamId), readLatencies.get(streamId), readErrors.get(streamId), - interceptors.getOrDefault(streamId, null)); + receiver.setPrefetchCount(prefetchCount); + PartitionReceiveHandler handler = + new PartitionReceiverHandlerImpl(ssp, eventReadRates.get(streamId), eventByteReadRates.get(streamId), + readLatencies.get(streamId), readErrors.get(streamId), interceptors.getOrDefault(streamId, null), + config.getMaxEventCountPerPoll(systemName)); // Timeout for EventHubClient receive receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT); @@ -261,16 +271,16 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { streamPartitionReceivers.put(ssp, receiver); } catch (Exception e) { throw new SamzaException( - String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", - namespace, entityPath, partitionId), e); + String.format("Failed to create receiver for EventHubs: namespace=%s, entity=%s, partitionId=%d", namespace, + entityPath, partitionId), e); } LOG.debug(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath)); - } } @Override - public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { + public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( + Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { Throwable handlerError = eventHubHandlerError.get(); if (handlerError != null) { @@ -305,8 +315,10 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { // Recreate receiver PartitionReceiver receiver = eventHubClientManager.getEventHubClient() - .createReceiverSync(consumerGroup, partitionId.toString(), offset, - !offset.equals(EventHubSystemConsumer.START_OF_STREAM)); + .createReceiverSync(consumerGroup, partitionId.toString(), + EventPosition.fromOffset(offset, !offset.equals(EventHubSystemConsumer.START_OF_STREAM))); + + receiver.setPrefetchCount(prefetchCount); // Timeout for EventHubClient receive receiver.setReceiveTimeout(DEFAULT_EVENTHUB_RECEIVER_TIMEOUT); @@ -314,10 +326,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { // Create and start receiver thread with handler receiver.setReceiveHandler(streamPartitionHandlers.get(ssp)); streamPartitionReceivers.put(ssp, receiver); - } catch (Exception e) { eventHubHandlerError.set(new SamzaException( - String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e)); + String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e)); } } @@ -336,9 +347,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { } private boolean isErrorTransient(Throwable throwable) { - if (throwable instanceof ServiceBusException) { - ServiceBusException serviceBusException = (ServiceBusException) throwable; - return serviceBusException.getIsTransient(); + if (throwable instanceof EventHubException) { + EventHubException eventHubException = (EventHubException) throwable; + return eventHubException.getIsTransient(); } return false; } @@ -348,24 +359,30 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { return new LinkedBlockingQueue<>(config.getConsumerBufferCapacity(systemName)); } - protected class PartitionReceiverHandlerImpl extends PartitionReceiveHandler { + protected class PartitionReceiverHandlerImpl implements PartitionReceiveHandler { private final Counter eventReadRate; private final Counter eventByteReadRate; private final SamzaHistogram readLatency; private final Counter errorRate; private final Interceptor interceptor; + private final Integer maxEventCount; SystemStreamPartition ssp; PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate, - SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor) { - super(MAX_EVENT_COUNT_PER_PARTITION_POLL); + SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor, int maxEventCount) { this.ssp = ssp; this.eventReadRate = eventReadRate; this.eventByteReadRate = eventByteReadRate; this.readLatency = readLatency; this.errorRate = readErrors; this.interceptor = interceptor; + this.maxEventCount = maxEventCount; + } + + @Override + public int getMaxEventCount() { + return this.maxEventCount; } @Override @@ -415,8 +432,8 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { errorRate.inc(); aggReadErrors.inc(); - if (throwable instanceof ServiceBusException) { - ServiceBusException busException = (ServiceBusException) throwable; + if (throwable instanceof EventHubException) { + EventHubException busException = (EventHubException) throwable; if (busException.getIsTransient()) { http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java index 7d6d408..03fc114 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/metrics/SamzaHistogram.java @@ -1,35 +1,36 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.system.eventhub.metrics; import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Histogram; import com.codahale.metrics.Snapshot; -import org.apache.samza.metrics.Gauge; -import org.apache.samza.metrics.MetricsRegistry; - import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.metrics.MetricsVisitor; + /** * Creates a {@link Histogram} metric using {@link ExponentiallyDecayingReservoir} @@ -49,14 +50,34 @@ public class SamzaHistogram { this.histogram = new Histogram(new ExponentiallyDecayingReservoir()); this.percentiles = percentiles; this.gauges = this.percentiles.stream() - .filter(x -> x > 0 && x <= 100) - .collect( - Collectors.toMap(Function.identity(), x -> registry.newGauge(group, name + "_" + String.valueOf(0), 0D))); + .filter(x -> x > 0 && x <= 100) + .collect(Collectors.toMap(Function.identity(), + x -> registry.newGauge(group, new HistogramGauge(x, name + "_" + String.valueOf(x), 0D)))); } public void update(long value) { histogram.update(value); + } + + public void updateGaugeValues(double percentile) { Snapshot values = histogram.getSnapshot(); - percentiles.forEach(x -> gauges.get(x).set(values.getValue(x / 100))); + gauges.get(percentile).set(values.getValue(percentile / 100)); + } + + /** + * Custom gauge whose value is set based on the underlying Histogram + */ + private class HistogramGauge extends Gauge<Double> { + private final Double percentile; + + public HistogramGauge(Double percentile, String name, double value) { + super(name, value); + this.percentile = percentile; + } + + public void visit(MetricsVisitor visitor) { + updateGaugeValues(percentile); + visitor.gauge(this); + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java index 901dd6a..307b8f6 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/AsyncSystemProducer.java @@ -139,7 +139,6 @@ public abstract class AsyncSystemProducer implements SystemProducer { // Auto update the metrics and possible throwable when futures are complete. sendResult.handle((aVoid, throwable) -> { - pendingFutures.remove(sendResult); long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs; sendCallbackLatency.get(streamId).update(callbackLatencyMs); aggSendCallbackLatency.update(callbackLatencyMs); http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 5139dc6..3639bbc 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -21,8 +21,9 @@ package org.apache.samza.system.eventhub.producer; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.PartitionSender; -import com.microsoft.azure.servicebus.ServiceBusException; +import com.microsoft.azure.eventhubs.impl.EventDataImpl; import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; @@ -164,7 +165,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer { } catch (InterruptedException | ExecutionException | TimeoutException e) { String msg = "Failed to fetch number of Event Hub partitions for partition sender creation"; throw new SamzaException(msg, e); - } catch (ServiceBusException | IllegalArgumentException e) { + } catch (EventHubException | IllegalArgumentException e) { String msg = "Creation of partition sender failed with exception"; throw new SamzaException(msg, e); } @@ -282,7 +283,7 @@ public class EventHubSystemProducer extends AsyncSystemProducer { eventValue = interceptor.get().intercept(eventValue); } - EventData eventData = new EventData(eventValue); + EventData eventData = new EventDataImpl(eventValue); eventData.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java index b5b55dc..1e3d4f5 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventData.java @@ -21,15 +21,17 @@ package org.apache.samza.system.eventhub; import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.impl.EventDataImpl; import java.nio.charset.Charset; import java.util.*; -public class MockEventData extends EventData { +public class MockEventData implements EventData { + EventData eventData; private EventData.SystemProperties overridedSystemProperties; private MockEventData(byte[] data, String partitionKey, String offset) { - super(data); + eventData = new EventDataImpl(data); HashMap<String, Object> properties = new HashMap<>(); properties.put("x-opt-offset", offset); properties.put("x-opt-partition-key", partitionKey); @@ -51,6 +53,21 @@ public class MockEventData extends EventData { } @Override + public Object getObject() { + return eventData.getObject(); + } + + @Override + public byte[] getBytes() { + return eventData.getBytes(); + } + + @Override + public Map<String, Object> getProperties() { + return eventData.getProperties(); + } + + @Override public EventData.SystemProperties getSystemProperties() { return overridedSystemProperties; } http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java index 368087a..6ee9bcf 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java @@ -26,7 +26,6 @@ import org.junit.Assert; import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; -import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -39,7 +38,7 @@ import static org.mockito.Matchers.*; public class MockEventHubClientManagerFactory extends EventHubClientManagerFactory { private Map<SystemStreamPartition, List<EventData>> eventData; private Map<String, Map<String, Map<Integer, List<EventData>>>> receivedData; - private Map<String, String> startingOffsets = new HashMap<>(); + private Map<String, EventPosition> startingOffsets = new HashMap<>(); public MockEventHubClientManagerFactory() { this.receivedData = new HashMap<>(); @@ -71,7 +70,7 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp))); } - public String getPartitionOffset(String partitionId) { + public EventPosition getPartitionOffset(String partitionId) { return startingOffsets.getOrDefault(partitionId, null); } @@ -101,10 +100,10 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto } return null; }); - EventHubPartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(EventHubPartitionRuntimeInformation.class); + PartitionRuntimeInformation mockPartitionRuntimeInfo = PowerMockito.mock(PartitionRuntimeInformation.class); PowerMockito.when(mockPartitionRuntimeInfo.getLastEnqueuedOffset()) .thenReturn(EventHubSystemConsumer.START_OF_STREAM); - CompletableFuture<EventHubPartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo); + CompletableFuture<PartitionRuntimeInformation> partitionFuture = new MockPartitionFuture(mockPartitionRuntimeInfo); // Producer mocks PartitionSender mockPartitionSender0 = PowerMockito.mock(PartitionSender.class); @@ -128,16 +127,16 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto try { // Consumer calls - PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), any(Instant.class))) + PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject())) .then((Answer<PartitionReceiver>) invocationOnMock -> { String partitionId = invocationOnMock.getArgumentAt(1, String.class); - startingOffsets.put(partitionId, EventHubSystemConsumer.END_OF_STREAM); + startingOffsets.put(partitionId, EventPosition.fromEndOfStream()); return mockPartitionReceiver; }); - PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyString(), anyBoolean())) + PowerMockito.when(mockEventHubClient.createReceiverSync(anyString(), anyString(), anyObject())) .then((Answer<PartitionReceiver>) invocationOnMock -> { String partitionId = invocationOnMock.getArgumentAt(1, String.class); - String offset = invocationOnMock.getArgumentAt(2, String.class); + EventPosition offset = invocationOnMock.getArgumentAt(2, EventPosition.class); startingOffsets.put(partitionId, offset); return mockPartitionReceiver; }); @@ -196,15 +195,15 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto } } - private class MockPartitionFuture extends CompletableFuture<EventHubPartitionRuntimeInformation> { - EventHubPartitionRuntimeInformation runtimeInformation; + private class MockPartitionFuture extends CompletableFuture<PartitionRuntimeInformation> { + PartitionRuntimeInformation runtimeInformation; - MockPartitionFuture(EventHubPartitionRuntimeInformation runtimeInformation) { + MockPartitionFuture(PartitionRuntimeInformation runtimeInformation) { this.runtimeInformation = runtimeInformation; } @Override - public EventHubPartitionRuntimeInformation get(long timeout, TimeUnit unit) { + public PartitionRuntimeInformation get(long timeout, TimeUnit unit) { return runtimeInformation; } } http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java index a421cbd..d29b975 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestMetricsRegistry.java @@ -70,7 +70,12 @@ public class TestMetricsRegistry implements MetricsRegistry { @Override public <T> Gauge<T> newGauge(String group, Gauge<T> value) { - return null; + if (!gauges.containsKey(group)) { + gauges.put(group, new ArrayList<>()); + } + + gauges.get(group).add(value); + return value; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java index 4fced77..b40d86d 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java @@ -42,7 +42,7 @@ import java.util.stream.Collectors; import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; @RunWith(PowerMockRunner.class) -@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, +@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) public class TestEventHubSystemConsumer { private static final String MOCK_ENTITY_1 = "mocktopic1"; @@ -99,8 +99,8 @@ public class TestEventHubSystemConsumer { consumer.register(ssp, EventHubSystemConsumer.START_OF_STREAM); consumer.start(); - Assert.assertEquals(EventHubSystemConsumer.START_OF_STREAM, - eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId))); + Assert.assertEquals(EventPosition.fromOffset(EventHubSystemConsumer.START_OF_STREAM, false).toString(), + eventHubClientWrapperFactory.getPartitionOffset(String.valueOf(partitionId)).toString()); } @Test http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java index 32b1604..63f6daa 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/ITestEventHubSystemProducer.java @@ -21,13 +21,13 @@ package org.apache.samza.system.eventhub.producer; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; +import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiver; -import com.microsoft.azure.servicebus.ServiceBusException; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.system.*; import org.apache.samza.system.eventhub.*; -import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.Assert; import org.junit.Ignore; @@ -92,7 +92,7 @@ public class ITestEventHubSystemProducer { } @Test - public void testReceive() throws ServiceBusException { + public void testReceive() throws EventHubException { EventHubClientManagerFactory clientFactory = new EventHubClientManagerFactory(); EventHubClientManager wrapper = clientFactory .getEventHubClientManager(SYSTEM_NAME, STREAM_NAME1, new EventHubConfig(createEventHubConfig())); @@ -100,11 +100,11 @@ public class ITestEventHubSystemProducer { EventHubClient client = wrapper.getEventHubClient(); PartitionReceiver receiver = client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, "0", - EventHubSystemConsumer.START_OF_STREAM, true); + EventPosition.fromStartOfStream()); receiveMessages(receiver, 300); } - private void receiveMessages(PartitionReceiver receiver, int numMessages) throws ServiceBusException { + private void receiveMessages(PartitionReceiver receiver, int numMessages) throws EventHubException { int count = 0; while (count < numMessages) { http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java index b5206bb..9a3bf7d 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java @@ -20,9 +20,9 @@ package org.apache.samza.system.eventhub.producer; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubPartitionRuntimeInformation; import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.PartitionRuntimeInformation; import com.microsoft.azure.eventhubs.PartitionSender; import java.util.ArrayList; import java.util.HashMap; @@ -49,7 +49,7 @@ import static org.apache.samza.system.eventhub.MockEventHubConfigFactory.*; @RunWith(PowerMockRunner.class) -@PrepareForTest({EventHubRuntimeInformation.class, EventHubPartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) +@PrepareForTest({EventHubRuntimeInformation.class, PartitionRuntimeInformation.class, EventHubClient.class, PartitionReceiver.class, PartitionSender.class}) public class TestEventHubSystemProducer { private static final String SOURCE = "TestEventHubSystemProducer"; http://git-wip-us.apache.org/repos/asf/samza/blob/fda1e37d/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java ---------------------------------------------------------------------- diff --git a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java index 096e12d..a72c8b7 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/EventHubConsoleConsumer.java @@ -1,38 +1,41 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.samza.tools; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.EventHubRuntimeInformation; +import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiver; -import com.microsoft.azure.servicebus.ConnectionStringBuilder; -import com.microsoft.azure.servicebus.ServiceBusException; import java.io.IOException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; + /** * Tool to read events from Microsoft Azure event hubs. */ @@ -59,17 +62,17 @@ public class EventHubConsoleConsumer { private static final String OPT_DESC_TOKEN = "Token corresponding to the key."; public static void main(String[] args) - throws ServiceBusException, IOException, ExecutionException, InterruptedException { + throws EventHubException, IOException, ExecutionException, InterruptedException { Options options = new Options(); options.addOption( CommandLineHelper.createOption(OPT_SHORT_EVENTHUB_NAME, OPT_LONG_EVENTHUB_NAME, OPT_ARG_EVENTHUB_NAME, true, - OPT_DESC_EVENTHUB_NAME)); + OPT_DESC_EVENTHUB_NAME)); - options.addOption( - CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true, OPT_DESC_NAMESPACE)); + options.addOption(CommandLineHelper.createOption(OPT_SHORT_NAMESPACE, OPT_LONG_NAMESPACE, OPT_ARG_NAMESPACE, true, + OPT_DESC_NAMESPACE)); - options.addOption( - CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true, OPT_DESC_KEY_NAME)); + options.addOption(CommandLineHelper.createOption(OPT_SHORT_KEY_NAME, OPT_LONG_KEY_NAME, OPT_ARG_KEY_NAME, true, + OPT_DESC_KEY_NAME)); options.addOption( CommandLineHelper.createOption(OPT_SHORT_TOKEN, OPT_LONG_TOKEN, OPT_ARG_TOKEN, true, OPT_DESC_TOKEN)); @@ -93,17 +96,20 @@ public class EventHubConsoleConsumer { } private static void consumeEvents(String ehName, String namespace, String keyName, String token) - throws ServiceBusException, IOException, ExecutionException, InterruptedException { - ConnectionStringBuilder connStr = new ConnectionStringBuilder(namespace, ehName, keyName, token); + throws EventHubException, IOException, ExecutionException, InterruptedException { + ConnectionStringBuilder connStr = new ConnectionStringBuilder().setNamespaceName(namespace) + .setEventHubName(ehName) + .setSasKeyName(keyName) + .setSasKey(token); - EventHubClient client = EventHubClient.createFromConnectionStringSync(connStr.toString()); + EventHubClient client = EventHubClient.createSync(connStr.toString(), Executors.newFixedThreadPool(10)); EventHubRuntimeInformation runTimeInfo = client.getRuntimeInformation().get(); int numPartitions = runTimeInfo.getPartitionCount(); for (int partition = 0; partition < numPartitions; partition++) { PartitionReceiver receiver = client.createReceiverSync(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, String.valueOf(partition), - PartitionReceiver.START_OF_STREAM); + EventPosition.fromStartOfStream()); receiver.receive(10).handle((records, throwable) -> handleComplete(receiver, records, throwable)); } }