Repository: samza
Updated Branches:
  refs/heads/master 4d6ff989b -> 8b8526682


SAMZA-1956: Update value only descriptor serde

Changed the KVSerde to only value Serde  for the Eventhubs input and output 
descriptors.
Since the key is always a `String`, the key serde should always be `NoOpSerde` 
and will lead to an error otherwise since the Samza 
`serializers.SerdeManager.scala` expectes a `byte[]`

Author: Daniel Chen <dch...@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw...@apache.org>

Closes #733 from dxichen/eventhubs-example-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8b852668
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8b852668
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8b852668

Branch: refs/heads/master
Commit: 8b85266829e65ef46ab15d6356e3fa87cd00684f
Parents: 4d6ff98
Author: Daniel Chen <dch...@linkedin.com>
Authored: Tue Oct 16 18:56:44 2018 -0700
Committer: Prateek Maheshwari <pmaheshw...@apache.org>
Committed: Tue Oct 16 18:56:44 2018 -0700

----------------------------------------------------------------------
 .../descriptors/EventHubsInputDescriptor.java   |  8 ++++--
 .../descriptors/EventHubsOutputDescriptor.java  |  8 ++++--
 .../descriptors/EventHubsSystemDescriptor.java  | 29 ++++++++++----------
 .../TestEventHubsInputDescriptor.java           | 26 ++++++++++++++----
 .../TestEventHubsOutputDescriptor.java          | 26 ++++++++++++++----
 .../TestEventHubsSystemDescriptor.java          | 26 ++++++------------
 6 files changed, 73 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
index 462cc05..df22269 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 
@@ -52,12 +54,12 @@ public class EventHubsInputDescriptor<StreamMessageType>
    * @param streamId id of the stream
    * @param namespace namespace for the Event Hubs entity to consume from, not 
null
    * @param entityPath entity path for the Event Hubs entity to consume from, 
not null
-   * @param serde serde for messages in the stream
+   * @param valueSerde serde the values in the messages in the stream
    * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
    */
-  EventHubsInputDescriptor(String streamId, String namespace, String 
entityPath, Serde serde,
+  EventHubsInputDescriptor(String streamId, String namespace, String 
entityPath, Serde valueSerde,
       SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor, null);
+    super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), 
systemDescriptor, null);
     this.namespace = StringUtils.stripToNull(namespace);
     this.entityPath = StringUtils.stripToNull(entityPath);
     if (this.namespace == null || this.entityPath == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
index ddbf79c..95f7e42 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 
@@ -50,12 +52,12 @@ public class EventHubsOutputDescriptor<StreamMessageType>
    * @param streamId id of the stream
    * @param namespace namespace for the Event Hubs entity to produce to, not 
null
    * @param entityPath entity path for the Event Hubs entity to produce to, 
not null
-   * @param serde serde for messages in the stream
+   * @param valueSerde serde the values in the messages in the stream
    * @param systemDescriptor system descriptor this stream descriptor was 
obtained from
    */
-  EventHubsOutputDescriptor(String streamId, String namespace, String 
entityPath, Serde serde,
+  EventHubsOutputDescriptor(String streamId, String namespace, String 
entityPath, Serde valueSerde,
       SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor);
+    super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), 
systemDescriptor);
     this.namespace = StringUtils.stripToNull(namespace);
     this.entityPath = StringUtils.stripToNull(entityPath);
     if (this.namespace == null || this.entityPath == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
index 80bdfae..feffd87 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import org.apache.samza.operators.KV;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.eventhub.EventHubConfig;
@@ -57,40 +58,40 @@ public class EventHubsSystemDescriptor extends 
SystemDescriptor<EventHubsSystemD
 
   /**
    * Gets an {@link EventHubsInputDescriptor} for the input stream of this 
system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the 
provided stream level serde.
+   * namespace and entity name of the associated Event Hubs entity and the 
provided stream level value serde.
    * <p>
-   * The type of messages in the stream is the type of the provided stream 
level serde.
+   * The message in the stream will have {@link String} keys and {@code 
ValueType} values.
    *
    * @param streamId id of the input stream
    * @param namespace namespace of the Event Hubs entity to consume from
    * @param entityPath entity path of the Event Hubs entity to consume from
-   * @param serde stream level serde for the input stream
-   * @param <StreamMessageType> type of messages in this stream
+   * @param valueSerde stream level serde for the values in the messages in 
the input stream
+   * @param <ValueType> type of the value in the messages in this stream
    * @return an {@link EventHubsInputDescriptor} for the Event Hubs input 
stream
    */
-  public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> 
getInputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
+  public <ValueType> EventHubsInputDescriptor<KV<String, ValueType>> 
getInputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<ValueType> valueSerde) {
     streamIds.add(streamId);
-    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, 
serde, this);
+    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, 
valueSerde, this);
   }
 
   /**
    * Gets an {@link EventHubsOutputDescriptor} for the output stream of this 
system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the 
provided stream level serde.
+   * namespace and entity name of the associated Event Hubs entity and the 
provided stream level value serde.
    * <p>
-   * The type of messages in the stream is the type of the provided stream 
level serde.
+   * The message in the stream will have {@link String} keys and {@code 
ValueType} values.
    *
    * @param streamId id of the output stream
    * @param namespace namespace of the Event Hubs entity to produce to
    * @param entityPath entity path of the Event Hubs entity to produce to
-   * @param serde stream level serde for the output stream
-   * @param <StreamMessageType> type of the messages in this stream
+   * @param valueSerde stream level serde for the values in the messages to 
the output stream
+   * @param <ValueType> type of the value in the messages in this stream
    * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output 
stream
    */
-  public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> 
getOutputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
+  public <ValueType> EventHubsOutputDescriptor<KV<String, ValueType>> 
getOutputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<ValueType> valueSerde) {
     streamIds.add(streamId);
-    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, 
serde, this);
+    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, 
valueSerde, this);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
index 1e6b368..c633ccc 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
 import java.util.Map;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 
@@ -40,8 +41,8 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
 
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = 
systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", 
KVSerde.of(new StringSerde(), new IntegerSerde()))
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = 
systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde())
         .withSasKeyName("secretkey")
         .withSasKey("sasToken-123")
         .withConsumerGroup("$notdefault");
@@ -62,8 +63,8 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
 
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = 
systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", 
KVSerde.of(new StringSerde(), new IntegerSerde()));
+    EventHubsInputDescriptor<KV<String, String>> inputDescriptor = 
systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde());
 
     Map<String, String> generatedConfigs = inputDescriptor.toConfig();
     assertEquals("eventHub", 
generatedConfigs.get("streams.input-stream.samza.system"));
@@ -82,11 +83,24 @@ public class TestEventHubsInputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
     try {
-      systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new 
StringSerde(), new IntegerSerde()));
+      systemDescriptor.getInputDescriptor(streamId, null, null, new 
StringSerde());
       fail("Should have thrown Config Exception");
     } catch (ConfigException exception) {
       assertEquals(String.format("Missing namespace and entity path Event Hubs 
input descriptor in " //
           + "system: {%s}, stream: {%s}", systemName, streamId), 
exception.getMessage());
     }
   }
+
+  @Test
+  public void testStreamDescriptorContainsKVserde() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
+    EventHubsInputDescriptor<KV<String, String>> outputDescriptor = 
systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde());
+    assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() 
instanceof NoOpSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() 
instanceof StringSerde);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
index fa8ae56..f214aa3 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
@@ -21,14 +21,15 @@ package org.apache.samza.system.eventhub.descriptors;
 import java.util.Map;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class TestEventHubsOutputDescriptor {
@@ -39,8 +40,8 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
 
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = 
systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", 
KVSerde.of(new StringSerde(), new IntegerSerde()))
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = 
systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde())
         .withSasKeyName("secretkey")
         .withSasKey("sasToken-123");
 
@@ -59,8 +60,8 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
 
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = 
systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", 
KVSerde.of(new StringSerde(), new IntegerSerde()));
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = 
systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde());
 
     Map<String, String> generatedConfigs = outputDescriptor.toConfig();
     assertEquals("eventHub", 
generatedConfigs.get("streams.output-stream.samza.system"));
@@ -79,11 +80,24 @@ public class TestEventHubsOutputDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
     try {
-      systemDescriptor.getOutputDescriptor(streamId, null, null, 
KVSerde.of(new StringSerde(), new IntegerSerde()));
+      systemDescriptor.getOutputDescriptor(streamId, null, null, new 
StringSerde());
       fail("Should have thrown Config Exception");
     } catch (ConfigException exception) {
       assertEquals(String.format("Missing namespace and entity path Event Hubs 
output descriptor in " //
           + "system: {%s}, stream: {%s}", systemName, streamId), 
exception.getMessage());
     }
   }
+
+  @Test
+  public void testStreamDescriptorContainsKVserde() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
+    EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = 
systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", new 
StringSerde());
+    assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() 
instanceof NoOpSerde);
+    assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() 
instanceof StringSerde);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8b852668/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
index 7f73bd9..84c8589 100644
--- 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
@@ -19,8 +19,6 @@
 package org.apache.samza.system.eventhub.descriptors;
 
 import java.util.Map;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.eventhub.EventHubConfig;
 import 
org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
@@ -47,14 +45,10 @@ public class TestEventHubsSystemDescriptor {
         .withRuntimeInfoTimeout(60000)
         .withSendKeys(false);
 
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", new StringSerde());
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", new StringSerde());
 
     Map<String, String> generatedConfigs = systemDescriptor.toConfig();
     assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", 
generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
@@ -96,14 +90,10 @@ public class TestEventHubsSystemDescriptor {
 
     EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
 
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", new StringSerde());
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", new StringSerde());
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", new StringSerde());
 
     Map<String, String> generatedConfigs = systemDescriptor.toConfig();
     assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", 
generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));

Reply via email to