Repository: samza
Updated Branches:
  refs/heads/master bbcf14eba -> 3895a9070


minor fix on eventhubs size limit for event body and partition key

Author: Hai Lu <h...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>, Srinivasulu 
Punuru<spun...@linkedin.com>

Closes #470 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3895a907
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3895a907
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3895a907

Branch: refs/heads/master
Commit: 3895a90706a8b5db3e622a0bb7b581852b60f7fa
Parents: bbcf14e
Author: Hai Lu <h...@linkedin.com>
Authored: Thu Apr 12 13:31:39 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Apr 12 13:31:39 2018 -0700

----------------------------------------------------------------------
 .../documentation/versioned/azure/eventhubs.md      |  4 ++++
 .../samza/system/eventhub/EventHubConfig.java       |  6 ++++--
 .../eventhub/producer/EventHubSystemProducer.java   | 16 +++++++++++++---
 3 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/docs/learn/documentation/versioned/azure/eventhubs.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/azure/eventhubs.md 
b/docs/learn/documentation/versioned/azure/eventhubs.md
index 7d76be3..ba7f760 100644
--- a/docs/learn/documentation/versioned/azure/eventhubs.md
+++ b/docs/learn/documentation/versioned/azure/eventhubs.md
@@ -58,6 +58,10 @@ collector.send(envelope);
 
 Each 
[OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)
 is converted into an 
[EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data)
 instance whose body is set to the `message` in the envelope. Additionally, the 
`key` and the `produce timestamp` are set as properties in the EventData before 
sending it to EventHubs.
 
+#### Size limit of partition key:
+
+Note that EventHubs has a limit on the length of partition key (128 
characters). In 
[EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java)
 we truncate the partition key if the size of the key exceeds the limit.
+
 ### Advanced configuration:
 
 ##### Producer partitioning: 

http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 6639dd8..e40b3c2 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -72,8 +72,10 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = 
"systems.%s.eventhubs.receive.queue.size";
   public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100;
 
-  // By default we will skip messages larger than 1MB.
-  private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024;
+  // By default we want to skip messages larger than 1MB. Also allow some 
buffer (24KB) to account for the overhead of
+  // metadata and key. So the default max message size will be 1000 KB 
(instead of precisely 1MB)
+  private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024;
+  private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - 
MESSAGE_HEADER_OVERHEAD;
 
   private final Map<String, String> physcialToId = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
index 3639bbc..55a2ae0 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java
@@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.PartitionSender;
+import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import com.microsoft.azure.eventhubs.impl.EventDataImpl;
 import java.nio.charset.Charset;
 import java.time.Duration;
@@ -211,6 +212,8 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
     }
 
     EventData eventData = createEventData(streamId, envelope);
+    // SAMZA-1654: waiting for the client library to expose the API to 
calculate the exact size of the AMQP message
+    // https://github.com/Azure/azure-event-hubs-java/issues/305
     int eventDataLength = eventData.getBytes() == null ? 0 : 
eventData.getBytes().length;
 
     // If the maxMessageSize is lesser than zero, then it means there is no 
message size restriction.
@@ -265,15 +268,22 @@ public class EventHubSystemProducer extends 
AsyncSystemProducer {
   }
 
   private String convertPartitionKeyToString(Object partitionKey) {
+    String partitionKeyStr;
     if (partitionKey instanceof String) {
-      return (String) partitionKey;
+      partitionKeyStr = (String) partitionKey;
     } else if (partitionKey instanceof Integer) {
-      return String.valueOf(partitionKey);
+      partitionKeyStr = String.valueOf(partitionKey);
     } else if (partitionKey instanceof byte[]) {
-      return new String((byte[]) partitionKey, Charset.defaultCharset());
+      partitionKeyStr = new String((byte[]) partitionKey, 
Charset.defaultCharset());
     } else {
       throw new SamzaException("Unsupported key type: " + 
partitionKey.getClass().toString());
     }
+    if (partitionKeyStr != null && partitionKeyStr.length() > 
ClientConstants.MAX_PARTITION_KEY_LENGTH) {
+      LOG.debug("Length of partition key: {} exceeds limit: {}. Truncating.", 
partitionKeyStr.length(),
+          ClientConstants.MAX_PARTITION_KEY_LENGTH);
+      partitionKeyStr = partitionKeyStr.substring(0, 
ClientConstants.MAX_PARTITION_KEY_LENGTH);
+    }
+    return partitionKeyStr;
   }
 
   protected EventData createEventData(String streamId, OutgoingMessageEnvelope 
envelope) {

Reply via email to