Repository: samza Updated Branches: refs/heads/master 4d6ff989b -> 8b8526682
SAMZA-1956: Update value only descriptor serde Changed the KVSerde to only value Serde for the Eventhubs input and output descriptors. Since the key is always a `String`, the key serde should always be `NoOpSerde` and will lead to an error otherwise since the Samza `serializers.SerdeManager.scala` expectes a `byte[]` Author: Daniel Chen <dch...@linkedin.com> Reviewers: Prateek Maheshwari <pmaheshw...@apache.org> Closes #733 from dxichen/eventhubs-example-cleanup Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b852668 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b852668 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b852668 Branch: refs/heads/master Commit: 8b85266829e65ef46ab15d6356e3fa87cd00684f Parents: 4d6ff98 Author: Daniel Chen <dch...@linkedin.com> Authored: Tue Oct 16 18:56:44 2018 -0700 Committer: Prateek Maheshwari <pmaheshw...@apache.org> Committed: Tue Oct 16 18:56:44 2018 -0700 ---------------------------------------------------------------------- .../descriptors/EventHubsInputDescriptor.java | 8 ++++-- .../descriptors/EventHubsOutputDescriptor.java | 8 ++++-- .../descriptors/EventHubsSystemDescriptor.java | 29 ++++++++++---------- .../TestEventHubsInputDescriptor.java | 26 ++++++++++++++---- .../TestEventHubsOutputDescriptor.java | 26 ++++++++++++++---- .../TestEventHubsSystemDescriptor.java | 26 ++++++------------ 6 files changed, 73 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java index 462cc05..df22269 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ConfigException; import org.apache.samza.system.descriptors.InputDescriptor; import org.apache.samza.system.descriptors.SystemDescriptor; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.eventhub.EventHubConfig; @@ -52,12 +54,12 @@ public class EventHubsInputDescriptor<StreamMessageType> * @param streamId id of the stream * @param namespace namespace for the Event Hubs entity to consume from, not null * @param entityPath entity path for the Event Hubs entity to consume from, not null - * @param serde serde for messages in the stream + * @param valueSerde serde the values in the messages in the stream * @param systemDescriptor system descriptor this stream descriptor was obtained from */ - EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde, + EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde, SystemDescriptor systemDescriptor) { - super(streamId, serde, systemDescriptor, null); + super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null); this.namespace = StringUtils.stripToNull(namespace); this.entityPath = StringUtils.stripToNull(entityPath); if (this.namespace == null || this.entityPath == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java index ddbf79c..95f7e42 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ConfigException; import org.apache.samza.system.descriptors.OutputDescriptor; import org.apache.samza.system.descriptors.SystemDescriptor; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.eventhub.EventHubConfig; @@ -50,12 +52,12 @@ public class EventHubsOutputDescriptor<StreamMessageType> * @param streamId id of the stream * @param namespace namespace for the Event Hubs entity to produce to, not null * @param entityPath entity path for the Event Hubs entity to produce to, not null - * @param serde serde for messages in the stream + * @param valueSerde serde the values in the messages in the stream * @param systemDescriptor system descriptor this stream descriptor was obtained from */ - EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde, + EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde, SystemDescriptor systemDescriptor) { - super(streamId, serde, systemDescriptor); + super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor); this.namespace = StringUtils.stripToNull(namespace); this.entityPath = StringUtils.stripToNull(entityPath); if (this.namespace == null || this.entityPath == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java index 80bdfae..feffd87 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import org.apache.samza.operators.KV; import org.apache.samza.system.descriptors.SystemDescriptor; import org.apache.samza.serializers.Serde; import org.apache.samza.system.eventhub.EventHubConfig; @@ -57,40 +58,40 @@ public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemD /** * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided - * namespace and entity name of the associated Event Hubs entity and the provided stream level serde. + * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde. * <p> - * The type of messages in the stream is the type of the provided stream level serde. + * The message in the stream will have {@link String} keys and {@code ValueType} values. * * @param streamId id of the input stream * @param namespace namespace of the Event Hubs entity to consume from * @param entityPath entity path of the Event Hubs entity to consume from - * @param serde stream level serde for the input stream - * @param <StreamMessageType> type of messages in this stream + * @param valueSerde stream level serde for the values in the messages in the input stream + * @param <ValueType> type of the value in the messages in this stream * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream */ - public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace, - String entityPath, Serde<StreamMessageType> serde) { + public <ValueType> EventHubsInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId, String namespace, + String entityPath, Serde<ValueType> valueSerde) { streamIds.add(streamId); - return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this); + return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, valueSerde, this); } /** * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided - * namespace and entity name of the associated Event Hubs entity and the provided stream level serde. + * namespace and entity name of the associated Event Hubs entity and the provided stream level value serde. * <p> - * The type of messages in the stream is the type of the provided stream level serde. + * The message in the stream will have {@link String} keys and {@code ValueType} values. * * @param streamId id of the output stream * @param namespace namespace of the Event Hubs entity to produce to * @param entityPath entity path of the Event Hubs entity to produce to - * @param serde stream level serde for the output stream - * @param <StreamMessageType> type of the messages in this stream + * @param valueSerde stream level serde for the values in the messages to the output stream + * @param <ValueType> type of the value in the messages in this stream * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream */ - public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace, - String entityPath, Serde<StreamMessageType> serde) { + public <ValueType> EventHubsOutputDescriptor<KV<String, ValueType>> getOutputDescriptor(String streamId, String namespace, + String entityPath, Serde<ValueType> valueSerde) { streamIds.add(streamId); - return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this); + return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, valueSerde, this); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java index 1e6b368..c633ccc 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java @@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors; import java.util.Map; import org.apache.samza.config.ConfigException; import org.apache.samza.operators.KV; -import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.eventhub.EventHubConfig; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,8 +41,8 @@ public class TestEventHubsInputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); - EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor - .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde())) + EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor + .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()) .withSasKeyName("secretkey") .withSasKey("sasToken-123") .withConsumerGroup("$notdefault"); @@ -62,8 +63,8 @@ public class TestEventHubsInputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); - EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor - .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde())); + EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor + .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()); Map<String, String> generatedConfigs = inputDescriptor.toConfig(); assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system")); @@ -82,11 +83,24 @@ public class TestEventHubsInputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); try { - systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde())); + systemDescriptor.getInputDescriptor(streamId, null, null, new StringSerde()); fail("Should have thrown Config Exception"); } catch (ConfigException exception) { assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " // + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage()); } } + + @Test + public void testStreamDescriptorContainsKVserde() { + String systemName = "eventHub"; + String streamId = "input-stream"; + + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); + EventHubsInputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor + .getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()); + assertTrue(outputDescriptor.getSerde() instanceof KVSerde); + assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde); + assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java index fa8ae56..f214aa3 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java @@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors; import java.util.Map; import org.apache.samza.config.ConfigException; import org.apache.samza.operators.KV; -import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.eventhub.EventHubConfig; import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class TestEventHubsOutputDescriptor { @@ -39,8 +40,8 @@ public class TestEventHubsOutputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); - EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor - .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde())) + EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor + .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()) .withSasKeyName("secretkey") .withSasKey("sasToken-123"); @@ -59,8 +60,8 @@ public class TestEventHubsOutputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); - EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor - .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde())); + EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor + .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()); Map<String, String> generatedConfigs = outputDescriptor.toConfig(); assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system")); @@ -79,11 +80,24 @@ public class TestEventHubsOutputDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); try { - systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde())); + systemDescriptor.getOutputDescriptor(streamId, null, null, new StringSerde()); fail("Should have thrown Config Exception"); } catch (ConfigException exception) { assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " // + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage()); } } + + @Test + public void testStreamDescriptorContainsKVserde() { + String systemName = "eventHub"; + String streamId = "output-stream"; + + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); + EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor + .getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde()); + assertTrue(outputDescriptor.getSerde() instanceof KVSerde); + assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde); + assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java index 7f73bd9..84c8589 100644 --- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java @@ -19,8 +19,6 @@ package org.apache.samza.system.eventhub.descriptors; import java.util.Map; -import org.apache.samza.serializers.IntegerSerde; -import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.eventhub.EventHubConfig; import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod; @@ -47,14 +45,10 @@ public class TestEventHubsSystemDescriptor { .withRuntimeInfoTimeout(60000) .withSendKeys(false); - systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(), - new IntegerSerde())); + systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde()); + systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde()); + systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde()); + systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde()); Map<String, String> generatedConfigs = systemDescriptor.toConfig(); assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName))); @@ -96,14 +90,10 @@ public class TestEventHubsSystemDescriptor { EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); - systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(), - new IntegerSerde())); - systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(), - new IntegerSerde())); + systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde()); + systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde()); + systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde()); + systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde()); Map<String, String> generatedConfigs = systemDescriptor.toConfig(); assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));