Repository: samza Updated Branches: refs/heads/master bbcf14eba -> 3895a9070
minor fix on eventhubs size limit for event body and partition key Author: Hai Lu <h...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org>, Srinivasulu Punuru<spun...@linkedin.com> Closes #470 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3895a907 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3895a907 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3895a907 Branch: refs/heads/master Commit: 3895a90706a8b5db3e622a0bb7b581852b60f7fa Parents: bbcf14e Author: Hai Lu <h...@linkedin.com> Authored: Thu Apr 12 13:31:39 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Apr 12 13:31:39 2018 -0700 ---------------------------------------------------------------------- .../documentation/versioned/azure/eventhubs.md | 4 ++++ .../samza/system/eventhub/EventHubConfig.java | 6 ++++-- .../eventhub/producer/EventHubSystemProducer.java | 16 +++++++++++++--- 3 files changed, 21 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/docs/learn/documentation/versioned/azure/eventhubs.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/azure/eventhubs.md b/docs/learn/documentation/versioned/azure/eventhubs.md index 7d76be3..ba7f760 100644 --- a/docs/learn/documentation/versioned/azure/eventhubs.md +++ b/docs/learn/documentation/versioned/azure/eventhubs.md @@ -58,6 +58,10 @@ collector.send(envelope); Each [OutgoingMessageEnvelope](https://samza.apache.org/learn/documentation/latest/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html) is converted into an [EventData](https://docs.microsoft.com/en-us/java/api/com.microsoft.azure.eventhubs._event_data) instance whose body is set to the `message` in the envelope. Additionally, the `key` and the `produce timestamp` are set as properties in the EventData before sending it to EventHubs. +#### Size limit of partition key: + +Note that EventHubs has a limit on the length of partition key (128 characters). In [EventHubSystemProducer](https://github.com/apache/samza/blob/master/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java) we truncate the partition key if the size of the key exceeds the limit. + ### Advanced configuration: ##### Producer partitioning: http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java index 6639dd8..e40b3c2 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java @@ -72,8 +72,10 @@ public class EventHubConfig extends MapConfig { public static final String CONFIG_CONSUMER_BUFFER_CAPACITY = "systems.%s.eventhubs.receive.queue.size"; public static final int DEFAULT_CONFIG_CONSUMER_BUFFER_CAPACITY = 100; - // By default we will skip messages larger than 1MB. - private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024; + // By default we want to skip messages larger than 1MB. Also allow some buffer (24KB) to account for the overhead of + // metadata and key. So the default max message size will be 1000 KB (instead of precisely 1MB) + private static final int MESSAGE_HEADER_OVERHEAD = 24 * 1024; + private static final int DEFAULT_MAX_MESSAGE_SIZE = 1024 * 1024 - MESSAGE_HEADER_OVERHEAD; private final Map<String, String> physcialToId = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/3895a907/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java index 3639bbc..55a2ae0 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java @@ -23,6 +23,7 @@ import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.eventhubs.impl.ClientConstants; import com.microsoft.azure.eventhubs.impl.EventDataImpl; import java.nio.charset.Charset; import java.time.Duration; @@ -211,6 +212,8 @@ public class EventHubSystemProducer extends AsyncSystemProducer { } EventData eventData = createEventData(streamId, envelope); + // SAMZA-1654: waiting for the client library to expose the API to calculate the exact size of the AMQP message + // https://github.com/Azure/azure-event-hubs-java/issues/305 int eventDataLength = eventData.getBytes() == null ? 0 : eventData.getBytes().length; // If the maxMessageSize is lesser than zero, then it means there is no message size restriction. @@ -265,15 +268,22 @@ public class EventHubSystemProducer extends AsyncSystemProducer { } private String convertPartitionKeyToString(Object partitionKey) { + String partitionKeyStr; if (partitionKey instanceof String) { - return (String) partitionKey; + partitionKeyStr = (String) partitionKey; } else if (partitionKey instanceof Integer) { - return String.valueOf(partitionKey); + partitionKeyStr = String.valueOf(partitionKey); } else if (partitionKey instanceof byte[]) { - return new String((byte[]) partitionKey, Charset.defaultCharset()); + partitionKeyStr = new String((byte[]) partitionKey, Charset.defaultCharset()); } else { throw new SamzaException("Unsupported key type: " + partitionKey.getClass().toString()); } + if (partitionKeyStr != null && partitionKeyStr.length() > ClientConstants.MAX_PARTITION_KEY_LENGTH) { + LOG.debug("Length of partition key: {} exceeds limit: {}. Truncating.", partitionKeyStr.length(), + ClientConstants.MAX_PARTITION_KEY_LENGTH); + partitionKeyStr = partitionKeyStr.substring(0, ClientConstants.MAX_PARTITION_KEY_LENGTH); + } + return partitionKeyStr; } protected EventData createEventData(String streamId, OutgoingMessageEnvelope envelope) {