Repository: samza
Updated Branches:
  refs/heads/master 482c6a227 -> da88096a0


http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
new file mode 100644
index 0000000..33bb1ba
--- /dev/null
+++ 
b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub;
+
+import java.util.Map;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import 
org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestEventHubsSystemDescriptor {
+  @Test
+  public void testWithDescriptorOverrides() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName)
+        .withMaxEventCountPerPoll(1000)
+        .withNumClientThreads(5)
+        .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
+        .withPrefetchCount(100)
+        .withReceiveQueueSize(500)
+        .withRuntimeInfoTimeout(60000)
+        .withSendKeys(false);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", 
generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals("1000", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL,
 systemName)));
+    assertEquals("5", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS,
 systemName)));
+    assertEquals("PARTITION_KEY_AS_PARTITION", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD,
 systemName)));
+    assertEquals("100", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, 
systemName)));
+    assertEquals("500", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY,
 systemName)));
+    assertEquals("60000", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS,
 systemName)));
+    assertEquals("false", 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES,
 systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + 
streamId4, 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName)));
+  }
+
+  @Test
+  public void testWithoutDescriptorOverrides() {
+    String systemName = "eventHub";
+
+    EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", 
generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES,
 systemName)));
+    
assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST,
 systemName)));
+    assertEquals(1, generatedConfigs.size());
+  }
+  @Test
+  public void testWithInputOutputStreams() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new 
EventHubsSystemDescriptor(systemName);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", 
"entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", 
"entity2", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", 
"entity3", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", 
"entity4", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", 
generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + 
streamId4, 
generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, 
systemName)));
+    assertEquals(2, generatedConfigs.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
 
b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
index 16accae..6c4ae49 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java
@@ -50,18 +50,12 @@ public final class DelegatingSystemDescriptor extends 
SystemDescriptor<Delegatin
     super(systemName, null, null, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericInputDescriptor<StreamMessageType> 
getInputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {
     return new GenericInputDescriptor<>(streamId, this, serde);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> 
getOutputDescriptor(
       String streamId, Serde<StreamMessageType> serde) {

http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
index 6fa8915..e88ed70 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java
@@ -69,17 +69,11 @@ public class KafkaSystemDescriptor extends 
SystemDescriptor<KafkaSystemDescripto
     super(systemName, FACTORY_CLASS_NAME, null, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> KafkaInputDescriptor<StreamMessageType> 
getInputDescriptor(String streamId, Serde<StreamMessageType> serde) {
     return new KafkaInputDescriptor<>(streamId, this, serde, null);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   public <StreamMessageType> KafkaOutputDescriptor<StreamMessageType> 
getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) {
     return new KafkaOutputDescriptor<>(streamId, this, serde);

Reply via email to