Repository: samza Updated Branches: refs/heads/master 482c6a227 -> da88096a0
http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java new file mode 100644 index 0000000..33bb1ba --- /dev/null +++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.eventhub; + +import java.util.Map; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestEventHubsSystemDescriptor { + @Test + public void testWithDescriptorOverrides() { + String systemName = "system-name"; + String streamId1 = "input-stream1"; + String streamId2 = "input-stream2"; + String streamId3 = "output-stream1"; + String streamId4 = "output-stream2"; + + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName) + .withMaxEventCountPerPoll(1000) + .withNumClientThreads(5) + .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION) + .withPrefetchCount(100) + .withReceiveQueueSize(500) + .withRuntimeInfoTimeout(60000) + .withSendKeys(false); + + systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(), + new IntegerSerde())); + + Map<String, String> generatedConfigs = systemDescriptor.toConfig(); + assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName))); + assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName))); + assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName))); + assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName))); + assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName))); + assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName))); + assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName))); + assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName))); + assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName))); + } + + @Test + public void testWithoutDescriptorOverrides() { + String systemName = "eventHub"; + + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); + + Map<String, String> generatedConfigs = systemDescriptor.toConfig(); + assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName))); + assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName))); + assertEquals(1, generatedConfigs.size()); + } + @Test + public void testWithInputOutputStreams() { + String systemName = "system-name"; + String streamId1 = "input-stream1"; + String streamId2 = "input-stream2"; + String streamId3 = "output-stream1"; + String streamId4 = "output-stream2"; + + EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName); + + systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(), + new IntegerSerde())); + systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(), + new IntegerSerde())); + + Map<String, String> generatedConfigs = systemDescriptor.toConfig(); + assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName))); + assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName))); + assertEquals(2, generatedConfigs.size()); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java index 16accae..6c4ae49 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/operators/descriptors/DelegatingSystemDescriptor.java @@ -50,18 +50,12 @@ public final class DelegatingSystemDescriptor extends SystemDescriptor<Delegatin super(systemName, null, null, null); } - /** - * {@inheritDoc} - */ @Override public <StreamMessageType> GenericInputDescriptor<StreamMessageType> getInputDescriptor( String streamId, Serde<StreamMessageType> serde) { return new GenericInputDescriptor<>(streamId, this, serde); } - /** - * {@inheritDoc} - */ @Override public <StreamMessageType> GenericOutputDescriptor<StreamMessageType> getOutputDescriptor( String streamId, Serde<StreamMessageType> serde) { http://git-wip-us.apache.org/repos/asf/samza/blob/da88096a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java index 6fa8915..e88ed70 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java @@ -69,17 +69,11 @@ public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescripto super(systemName, FACTORY_CLASS_NAME, null, null); } - /** - * {@inheritDoc} - */ @Override public <StreamMessageType> KafkaInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, Serde<StreamMessageType> serde) { return new KafkaInputDescriptor<>(streamId, this, serde, null); } - /** - * {@inheritDoc} - */ @Override public <StreamMessageType> KafkaOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) { return new KafkaOutputDescriptor<>(streamId, this, serde);