http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java new file mode 100644 index 0000000..d3fe252 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaInputDescriptor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.functions.InputTransformer; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for a kafka input stream. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + */ +public class KafkaInputDescriptor<StreamMessageType> + extends InputDescriptor<StreamMessageType, KafkaInputDescriptor<StreamMessageType>> { + private static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY = "systems.%s.streams.%s.consumer.auto.offset.reset"; + private static final String CONSUMER_FETCH_MESSAGE_MAX_BYTES_CONFIG_KEY = "systems.%s.streams.%s.consumer.fetch.message.max.bytes"; + + private Optional<String> consumerAutoOffsetResetOptional = Optional.empty(); + private Optional<Long> consumerFetchMessageMaxBytesOptional = Optional.empty(); + + KafkaInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde, InputTransformer transformer) { + super(streamId, serde, systemDescriptor, transformer); + } + + /** + * This setting determines what happens if a consumer attempts to read an offset for this topic that is outside of + * the current valid range. This could happen if the topic does not exist, or if a checkpoint is older than the + * maximum message history retained by the brokers. This property is not to be confused with + * {@link InputDescriptor#withOffsetDefault}, which determines what happens if there is no checkpoint. + * <p> + * The following are valid values for auto.offset.reset: + * <ul> + * <li>smallest: Start consuming at the smallest (oldest) offset available on the broker + * (process as much message history as available). + * <li>largest: Start consuming at the largest (newest) offset available on the broker + * (skip any messages published while the job was not running). + * <li>anything else: Throw an exception and refuse to start up the job. + * </ul> + * <p> + * Note: This property may be set at a system level using {@link KafkaSystemDescriptor#withConsumerAutoOffsetReset} + * + * @param consumerAutoOffsetReset consumer auto offset reset policy for the input + * @return this input descriptor + */ + public KafkaInputDescriptor<StreamMessageType> withConsumerAutoOffsetReset(String consumerAutoOffsetReset) { + this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset)); + return this; + } + + /** + * The number of bytes of messages to attempt to fetch for each topic-partition for this topic in each fetch request. + * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. + * The fetch request size must be at least as large as the maximum message size the server allows or else it is + * possible for the producer to send messages larger than the consumer can fetch. + * <p> + * Note: This property may be set at a system level using {@link KafkaSystemDescriptor#withConsumerFetchMessageMaxBytes} + * + * @param fetchMessageMaxBytes number of bytes of messages to attempt to fetch for each topic-partition + * in each fetch request + * @return this input descriptor + */ + public KafkaInputDescriptor<StreamMessageType> withConsumerFetchMessageMaxBytes(long fetchMessageMaxBytes) { + this.consumerFetchMessageMaxBytesOptional = Optional.of(fetchMessageMaxBytes); + return this; + } + + @Override + public Map<String, String> toConfig() { + HashMap<String, String> configs = new HashMap<>(super.toConfig()); + // Note: Kafka configuration needs the topic's physical name, not the stream-id. + // We won't have that here if user only specified it in configs, or if it got rewritten + // by the planner to something different than what's in this descriptor. + String streamName = getPhysicalName().orElse(getStreamId()); + String systemName = getSystemName(); + + consumerAutoOffsetResetOptional.ifPresent(autoOffsetReset -> + configs.put(String.format(CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY, systemName, streamName), autoOffsetReset)); + consumerFetchMessageMaxBytesOptional.ifPresent(fetchMessageMaxBytes -> + configs.put(String.format(CONSUMER_FETCH_MESSAGE_MAX_BYTES_CONFIG_KEY, systemName, streamName), Long.toString(fetchMessageMaxBytes))); + return configs; + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java new file mode 100644 index 0000000..1142276 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaOutputDescriptor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + +/** + * A descriptor for a kafka output stream. + * <p> + * An instance of this descriptor may be obtained from an appropriately configured {@link KafkaSystemDescriptor}. + * <p> + * Stream properties configured using a descriptor override corresponding properties provided in configuration. + * + * @param <StreamMessageType> type of messages in this stream. + */ +public class KafkaOutputDescriptor<StreamMessageType> + extends OutputDescriptor<StreamMessageType, KafkaOutputDescriptor<StreamMessageType>> { + KafkaOutputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde<StreamMessageType> serde) { + super(streamId, serde, systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java new file mode 100644 index 0000000..6fa8915 --- /dev/null +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemDescriptor.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; +import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider; +import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.serializers.Serde; + + +/** + * A descriptor for a Kafka system. + * <p> + * System properties configured using a descriptor override corresponding properties provided in configuration. + */ +@SuppressWarnings("unchecked") +public class KafkaSystemDescriptor extends SystemDescriptor<KafkaSystemDescriptor> + implements SimpleInputDescriptorProvider, OutputDescriptorProvider { + private static final String FACTORY_CLASS_NAME = KafkaSystemFactory.class.getName(); + private static final String CONSUMER_ZK_CONNECT_CONFIG_KEY = "systems.%s.consumer.zookeeper.connect"; + private static final String CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY = "systems.%s.consumer.auto.offset.reset"; + private static final String CONSUMER_FETCH_THRESHOLD_CONFIG_KEY = KafkaConfig.CONSUMER_FETCH_THRESHOLD(); + private static final String CONSUMER_FETCH_THRESHOLD_BYTES_CONFIG_KEY = KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(); + private static final String CONSUMER_FETCH_MESSAGE_MAX_BYTES_KEY = "systems.%s.consumer.fetch.message.max.bytes"; + private static final String CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"; + private static final String PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers"; + private static final String PRODUCER_CONFIGS_CONFIG_KEY = "systems.%s.producer.%s"; + + private List<String> consumerZkConnect = Collections.emptyList(); + private Optional<String> consumerAutoOffsetResetOptional = Optional.empty(); + private Optional<Integer> consumerFetchThresholdOptional = Optional.empty(); + private Optional<Long> consumerFetchThresholdBytesOptional = Optional.empty(); + private Optional<Long> consumerFetchMessageMaxBytesOptional = Optional.empty(); + private Map<String, String> consumerConfigs = Collections.emptyMap(); + private List<String> producerBootstrapServers = Collections.emptyList(); + private Map<String, String> producerConfigs = Collections.emptyMap(); + + /** + * Constructs a {@link KafkaSystemDescriptor} instance with no system level serde. + * Serdes must be provided explicitly at stream level when getting input or output descriptors. + * + * @param systemName name of this system + */ + public KafkaSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, null, null); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> KafkaInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new KafkaInputDescriptor<>(streamId, this, serde, null); + } + + /** + * {@inheritDoc} + */ + @Override + public <StreamMessageType> KafkaOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, Serde<StreamMessageType> serde) { + return new KafkaOutputDescriptor<>(streamId, this, serde); + } + + /** + * The hostname and port of one or more Zookeeper nodes where information about the Kafka cluster can be found. + * This is given as a list of hostname:port pairs, such as + * {@code ImmutableList.of("zk1.example.com:2181", "zk2.example.com:2181", "zk3.example.com:2181")}. + * If the cluster information is at some sub-path of the Zookeeper namespace, you need to include the path at the + * end of the list of hostnames, for example: + * {@code ImmutableList.of("zk1.example.com:2181", "zk2.example.com:2181/clusters/my-kafka")}. + * + * @param consumerZkConnect Zookeeper connection information for the system + * @return this system descriptor + */ + public KafkaSystemDescriptor withConsumerZkConnect(List<String> consumerZkConnect) { + this.consumerZkConnect = consumerZkConnect; + return this; + } + + /** + * This setting determines what happens if a consumer attempts to read an offset that is outside of the current + * valid range. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message + * history retained by the brokers. This property is not to be confused with {@link InputDescriptor#withOffsetDefault}, + * which determines what happens if there is no checkpoint. + * <p> + * The following are valid values for auto.offset.reset: + * <ul> + * <li>smallest: Start consuming at the smallest (oldest) offset available on the broker + * (process as much message history as available). + * <li>largest: Start consuming at the largest (newest) offset available on the broker + * (skip any messages published while the job was not running). + * <li>anything else: Throw an exception and refuse to start up the job. + * </ul> + * <p> + * Note: This property may be set at a topic level using {@link KafkaInputDescriptor#withConsumerAutoOffsetReset} + * + * @param consumerAutoOffsetReset consumer auto offset reset policy for the system + * @return this system descriptor + */ + public KafkaSystemDescriptor withConsumerAutoOffsetReset(String consumerAutoOffsetReset) { + this.consumerAutoOffsetResetOptional = Optional.of(StringUtils.stripToNull(consumerAutoOffsetReset)); + return this; + } + + /** + * When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in + * order to increase throughput (the stream task can continue processing buffered messages while new messages are + * fetched from Kafka). This parameter determines the number of messages we aim to buffer across all stream partitions + * consumed by a container. For example, if a container consumes 50 partitions, it will try to buffer 1000 + * messages per partition by default. When the number of buffered messages falls below that threshold, Samza + * fetches more messages from the Kafka broker to replenish the buffer. Increasing this parameter can increase + * a job's processing throughput, but also increases the amount of memory used. + * + * @param fetchThreshold number of incoming messages to buffer in-memory + * @return this system descriptor + */ + public KafkaSystemDescriptor withSamzaFetchThreshold(int fetchThreshold) { + this.consumerFetchThresholdOptional = Optional.of(fetchThreshold); + return this; + } + + /** + * When consuming streams from Kafka, a Samza container maintains an in-memory buffer for incoming messages in + * order to increase throughput (the stream task can continue processing buffered messages while new messages are + * fetched from Kafka). This parameter determines the total size of messages we aim to buffer across all stream + * partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered prefetch + * messages for job as a whole. The bytes for a single system/stream/partition are computed based on this. + * This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be + * the bytes limit + size of max message in the partition for a given stream. If the value of this property + * is > 0 then this takes precedence over systems.system-name.samza.fetch.threshold. + * <p> + * For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered, + * then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage + * can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops + * below 1000, a fetch request will be executed to get more data for it. Increasing this parameter will decrease + * the latency between when a queue is drained of messages and when new messages are enqueued, but also leads + * to an increase in memory usage since more messages will be held in memory. The default value is -1, + * which means this is not used. + * + * @param fetchThresholdBytes number of bytes for incoming messages to buffer in-memory + * @return this system descriptor + */ + public KafkaSystemDescriptor withSamzaFetchThresholdBytes(long fetchThresholdBytes) { + this.consumerFetchThresholdBytesOptional = Optional.of(fetchThresholdBytes); + return this; + } + + /** + * The number of bytes of messages to attempt to fetch for each topic-partition in each fetch request. + * These bytes will be read into memory for each partition, so this helps control the memory used by the consumer. + * The fetch request size must be at least as large as the maximum message size the server allows or else it is + * possible for the producer to send messages larger than the consumer can fetch. + * <p> + * Note: This property may be set at a topic level using {@link KafkaInputDescriptor#withConsumerFetchMessageMaxBytes} + * + * @param fetchMessageMaxBytes number of bytes of messages to attempt to fetch for each topic-partition + * in each fetch request + * @return this system descriptor + */ + public KafkaSystemDescriptor withConsumerFetchMessageMaxBytes(long fetchMessageMaxBytes) { + this.consumerFetchMessageMaxBytesOptional = Optional.of(fetchMessageMaxBytes); + return this; + } + + /** + * Any Kafka consumer configuration can be included here. For example, to change the socket timeout, + * you can set socket.timeout.ms. (There is no need to configure group.id or client.id, as they are automatically + * configured by Samza. Also, there is no need to set auto.commit.enable because Samza has its own + * checkpointing mechanism.) + * + * @param consumerConfigs additional consumer configuration + * @return this system descriptor + */ + public KafkaSystemDescriptor withConsumerConfigs(Map<String, String> consumerConfigs) { + this.consumerConfigs = consumerConfigs; + return this; + } + + /** + * A list of network endpoints where the Kafka brokers are running. This is given as a list of hostname:port pairs, + * for example {@code ImmutableList.of("kafka1.example.com:9092", "kafka2.example.com:9092", "kafka3.example.com:9092")}. + * It's not necessary to list every single Kafka node in the cluster: Samza uses this property in order to discover + * which topics and partitions are hosted on which broker. This property is needed even if you are only consuming + * from Kafka, and not writing to it, because Samza uses it to discover metadata about streams being consumed. + * + * @param producerBootstrapServers network endpoints where the kafka brokers are running + * @return this system descriptor + */ + public KafkaSystemDescriptor withProducerBootstrapServers(List<String> producerBootstrapServers) { + this.producerBootstrapServers = producerBootstrapServers; + return this; + } + + /** + * Any Kafka producer configuration can be included here. For example, to change the request timeout, + * you can set timeout.ms. (There is no need to configure client.id as it is automatically configured by Samza.) + * + * @param producerConfigs additional producer configuration + * @return this system descriptor + */ + public KafkaSystemDescriptor withProducerConfigs(Map<String, String> producerConfigs) { + this.producerConfigs = producerConfigs; + return this; + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> configs = new HashMap<>(super.toConfig()); + if(!consumerZkConnect.isEmpty()) { + configs.put(String.format(CONSUMER_ZK_CONNECT_CONFIG_KEY, getSystemName()), String.join(",", consumerZkConnect)); + } + consumerAutoOffsetResetOptional.ifPresent(consumerAutoOffsetReset -> + configs.put(String.format(CONSUMER_AUTO_OFFSET_RESET_CONFIG_KEY, getSystemName()), consumerAutoOffsetReset)); + consumerFetchThresholdOptional.ifPresent(consumerFetchThreshold -> + configs.put(String.format(CONSUMER_FETCH_THRESHOLD_CONFIG_KEY, getSystemName()), Integer.toString(consumerFetchThreshold))); + consumerFetchThresholdBytesOptional.ifPresent(consumerFetchThresholdBytes -> + configs.put(String.format(CONSUMER_FETCH_THRESHOLD_BYTES_CONFIG_KEY, getSystemName()), Long.toString(consumerFetchThresholdBytes))); + consumerFetchMessageMaxBytesOptional.ifPresent(consumerFetchMessageMaxBytes -> + configs.put(String.format(CONSUMER_FETCH_MESSAGE_MAX_BYTES_KEY, getSystemName()), Long.toString(consumerFetchMessageMaxBytes))); + consumerConfigs.forEach((key, value) -> configs.put(String.format(CONSUMER_CONFIGS_CONFIG_KEY, getSystemName(), key), value)); + if (!producerBootstrapServers.isEmpty()) { + configs.put(String.format(PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY, getSystemName()), String.join(",", producerBootstrapServers)); + } + producerConfigs.forEach((key, value) -> configs.put(String.format(PRODUCER_CONFIGS_CONFIG_KEY, getSystemName(), key), value)); + return configs; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java new file mode 100644 index 0000000..ac90cf0 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaInputDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.descriptors.GenericSystemDescriptor; +import org.apache.samza.serializers.IntegerSerde; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestKafkaInputDescriptor { + @Test + public void testISDConfigsWithOverrides() { + KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka"); + + KafkaInputDescriptor<KV<String, Integer>> isd = + sd.getInputDescriptor("input-stream", KVSerde.of(new StringSerde(), new IntegerSerde())) + .withPhysicalName("physical-name") + .withConsumerAutoOffsetReset("largest") + .withConsumerFetchMessageMaxBytes(1024 * 1024); + + Map<String, String> generatedConfigs = isd.toConfig();; + assertEquals("kafka", generatedConfigs.get("streams.input-stream.samza.system")); + assertEquals("physical-name", generatedConfigs.get("streams.input-stream.samza.physical.name")); + assertEquals("largest", generatedConfigs.get("systems.kafka.streams.physical-name.consumer.auto.offset.reset")); + assertEquals("1048576", generatedConfigs.get("systems.kafka.streams.physical-name.consumer.fetch.message.max.bytes")); + } + + @Test + public void testISDConfigsWithDefaults() { + KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka") + .withConsumerZkConnect(ImmutableList.of("localhost:123")) + .withProducerBootstrapServers(ImmutableList.of("localhost:567", "localhost:890")); + + KafkaInputDescriptor<KV<String, Integer>> isd = + sd.getInputDescriptor("input-stream", KVSerde.of(new StringSerde(), new IntegerSerde())); + + Map<String, String> generatedConfigs = isd.toConfig(); + assertEquals("kafka", generatedConfigs.get("streams.input-stream.samza.system")); + assertEquals(1, generatedConfigs.size()); // verify that there are no other configs + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java new file mode 100644 index 0000000..e1ddac0 --- /dev/null +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemDescriptor.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kafka; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import org.apache.samza.system.SystemStreamMetadata; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestKafkaSystemDescriptor { + @Test + public void testSDConfigsWithOverrides() { + KafkaSystemDescriptor sd = + new KafkaSystemDescriptor("kafka") + .withConsumerZkConnect(ImmutableList.of("localhost:1234")) + .withProducerBootstrapServers(ImmutableList.of("localhost:567", "localhost:890")) + .withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST) + .withConsumerAutoOffsetReset("smallest") + .withConsumerFetchMessageMaxBytes(1024*1024) + .withSamzaFetchThreshold(10000) + .withSamzaFetchThresholdBytes(1024 * 1024) + .withConsumerConfigs(ImmutableMap.of("custom-consumer-config-key", "custom-consumer-config-value")) + .withProducerConfigs(ImmutableMap.of("custom-producer-config-key", "custom-producer-config-value")) + .withDefaultStreamConfigs(ImmutableMap.of("custom-stream-config-key", "custom-stream-config-value")); + + Map<String, String> generatedConfigs = sd.toConfig(); + assertEquals("org.apache.samza.system.kafka.KafkaSystemFactory", generatedConfigs.get("systems.kafka.samza.factory")); + assertEquals("localhost:1234", generatedConfigs.get("systems.kafka.consumer.zookeeper.connect")); + assertEquals("localhost:567,localhost:890", generatedConfigs.get("systems.kafka.producer.bootstrap.servers")); + assertEquals("smallest", generatedConfigs.get("systems.kafka.consumer.auto.offset.reset")); + assertEquals("1048576", generatedConfigs.get("systems.kafka.consumer.fetch.message.max.bytes")); + assertEquals("10000", generatedConfigs.get("systems.kafka.samza.fetch.threshold")); + assertEquals("1048576", generatedConfigs.get("systems.kafka.samza.fetch.threshold.bytes")); + assertEquals("custom-consumer-config-value", generatedConfigs.get("systems.kafka.consumer.custom-consumer-config-key")); + assertEquals("custom-producer-config-value", generatedConfigs.get("systems.kafka.producer.custom-producer-config-key")); + assertEquals("custom-stream-config-value", generatedConfigs.get("systems.kafka.default.stream.custom-stream-config-key")); + assertEquals("oldest", generatedConfigs.get("systems.kafka.default.stream.samza.offset.default")); + assertEquals(11, generatedConfigs.size()); + } + + @Test + public void testSDConfigsWithoutOverrides() { + KafkaSystemDescriptor sd = new KafkaSystemDescriptor("kafka"); + + Map<String, String> generatedConfigs = sd.toConfig(); + assertEquals("org.apache.samza.system.kafka.KafkaSystemFactory", generatedConfigs.get("systems.kafka.samza.factory")); + assertEquals(1, generatedConfigs.size()); // verify that there are no other configs + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java ---------------------------------------------------------------------- diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java index 16da035..07f4f55 100644 --- a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java +++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java @@ -21,6 +21,7 @@ package org.apache.samza.storage.kv; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; @@ -31,7 +32,6 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.table.ReadableTable; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; @@ -50,6 +50,7 @@ import com.google.common.base.Preconditions; * stores. */ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvider { + public static final Pattern SYSTEM_STREAM_NAME_PATTERN = Pattern.compile("[\\d\\w-_.]+"); protected KeyValueStore kvStore; @@ -101,8 +102,8 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide List<String> sideInputs = tableSpec.getSideInputs(); if (sideInputs != null && !sideInputs.isEmpty()) { - sideInputs.forEach(si -> Preconditions.checkState(StreamGraphSpec.isValidStreamId(si), String.format( - "Side input stream %s doesn't confirm to pattern %s", si, StreamGraphSpec.STREAM_ID_PATTERN))); + sideInputs.forEach(si -> Preconditions.checkState(isValidSystemStreamName(si), String.format( + "Side input stream %s doesn't confirm to pattern %s", si, SYSTEM_STREAM_NAME_PATTERN))); String formattedSideInputs = String.join(",", sideInputs); storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS, tableSpec.getId()), formattedSideInputs); storeConfig.put(String.format(JavaStorageConfig.SIDE_INPUTS_PROCESSOR_SERIALIZED_INSTANCE, tableSpec.getId()), @@ -121,8 +122,8 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide tableSpec.getId()); } - Preconditions.checkState(StreamGraphSpec.isValidStreamId(changelogStream), String.format( - "Changelog stream %s doesn't confirm to pattern %s", changelogStream, StreamGraphSpec.STREAM_ID_PATTERN)); + Preconditions.checkState(isValidSystemStreamName(changelogStream), String.format( + "Changelog stream %s doesn't confirm to pattern %s", changelogStream, SYSTEM_STREAM_NAME_PATTERN)); storeConfig.put(String.format(StorageConfig.CHANGELOG_STREAM(), tableSpec.getId()), changelogStream); String changelogReplicationFactor = tableSpec.getConfig().get( @@ -140,4 +141,8 @@ abstract public class BaseLocalStoreBackedTableProvider extends BaseTableProvide public void close() { logger.info("Shutting down table provider for table " + tableSpec.getId()); } + + private boolean isValidSystemStreamName(String name) { + return StringUtils.isNotBlank(name) && SYSTEM_STREAM_NAME_PATTERN.matcher(name).matches(); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java index 1db3000..c422130 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java @@ -37,8 +37,12 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.GenericOutputDescriptor;; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.TableDescriptor; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; @@ -151,7 +155,11 @@ public class QueryTranslator { Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor(); if (!tableDescriptor.isPresent()) { - outputStream.sendTo(streamGraph.getOutputStream(sinkConfig.getStreamName())); + KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); + String systemName = sinkConfig.getSystemName(); + DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); + GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde); + outputStream.sendTo(streamGraph.getOutputStream(osd)); } else { Table outputTable = streamGraph.getTable(tableDescriptor.get()); if (outputTable == null) { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java index fa3d9d3..46e0840 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java @@ -27,7 +27,11 @@ import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.sql.data.SamzaSqlRelMessage; import org.apache.samza.sql.interfaces.SamzaRelConverter; import org.apache.samza.task.TaskContext; @@ -74,9 +78,14 @@ class ScanTranslator { String sourceName = SqlIOConfig.getSourceFromSourceParts(tableNameParts); Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName)); - final String streamName = systemStreamConfig.get(sourceName).getStreamName(); + SqlIOConfig sqlIOConfig = systemStreamConfig.get(sourceName); + final String systemName = sqlIOConfig.getSystemName(); + final String streamName = sqlIOConfig.getStreamName(); - MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(streamName); + KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); + DelegatingSystemDescriptor sd = context.getSystemDescriptors().computeIfAbsent(systemName, DelegatingSystemDescriptor::new); + GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde); + MessageStream<KV<Object, Object>> inputStream = streamGraph.getInputStream(isd); MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName)); context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java index fcebeef..e622d55 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java @@ -35,6 +35,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.schema.SchemaPlus; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.sql.data.RexToJavaCompiler; import org.apache.samza.sql.data.SamzaSqlExecutionContext; import org.apache.samza.sql.interfaces.SamzaRelConverter; @@ -50,8 +51,9 @@ public class TranslatorContext implements Cloneable { private final StreamGraph streamGraph; private final RexToJavaCompiler compiler; private final Map<String, SamzaRelConverter> relSamzaConverters; - private final Map<Integer, MessageStream> messsageStreams; + private final Map<Integer, MessageStream> messageStreams; private final Map<Integer, RelNode> relNodes; + private final Map<String, DelegatingSystemDescriptor> systemDescriptors; /** * The internal variables that are not shared among all cloned {@link TranslatorContext} @@ -123,10 +125,11 @@ public class TranslatorContext implements Cloneable { this.streamGraph = other.streamGraph; this.compiler = other.compiler; this.relSamzaConverters = other.relSamzaConverters; - this.messsageStreams = other.messsageStreams; + this.messageStreams = other.messageStreams; this.relNodes = other.relNodes; this.executionContext = other.executionContext.clone(); this.dataContext = new DataContextImpl(); + this.systemDescriptors = other.systemDescriptors; } /** @@ -142,8 +145,9 @@ public class TranslatorContext implements Cloneable { this.executionContext = executionContext; this.dataContext = new DataContextImpl(); this.relSamzaConverters = converters; - this.messsageStreams = new HashMap<>(); + this.messageStreams = new HashMap<>(); this.relNodes = new HashMap<>(); + this.systemDescriptors = new HashMap<>(); } /** @@ -184,7 +188,7 @@ public class TranslatorContext implements Cloneable { * @param stream the stream */ void registerMessageStream(int id, MessageStream stream) { - messsageStreams.put(id, stream); + messageStreams.put(id, stream); } /** @@ -194,7 +198,7 @@ public class TranslatorContext implements Cloneable { * @return the message stream */ MessageStream getMessageStream(int id) { - return messsageStreams.get(id); + return messageStreams.get(id); } void registerRelNode(int id, RelNode relNode) { @@ -209,6 +213,10 @@ public class TranslatorContext implements Cloneable { return this.relSamzaConverters.get(source); } + Map<String, DelegatingSystemDescriptor> getSystemDescriptors() { + return this.systemDescriptors; + } + /** * This method helps to create a per task instance of translator context * http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java index 2de4856..7395a3d 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java @@ -147,7 +147,7 @@ public class TestJoinTranslator extends TranslatorTestBase { when(mockOutputStream.isKeyed()).thenReturn(true); IntermediateMessageStreamImpl mockPartitionedStream = new IntermediateMessageStreamImpl(mockGraph, mockInputOp, mockOutputStream); - when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class))).thenReturn(mockPartitionedStream); + when(mockGraph.getIntermediateStream(any(String.class), any(Serde.class), eq(false))).thenReturn(mockPartitionedStream); doAnswer(this.getRegisterMessageStreamAnswer()).when(mockContext).registerMessageStream(eq(3), any(MessageStream.class)); RexToJavaCompiler mockCompiler = mock(RexToJavaCompiler.class); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java index ede7995..1776067 100644 --- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java +++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java @@ -52,7 +52,7 @@ public class TestQueryTranslator { Assert.assertTrue(originContext.getStreamGraph() == clonedContext.getStreamGraph()); Assert.assertTrue(originContext.getExpressionCompiler() == clonedContext.getExpressionCompiler()); Assert.assertTrue(Whitebox.getInternalState(originContext, "relSamzaConverters") == Whitebox.getInternalState(clonedContext, "relSamzaConverters")); - Assert.assertTrue(Whitebox.getInternalState(originContext, "messsageStreams") == Whitebox.getInternalState(clonedContext, "messsageStreams")); + Assert.assertTrue(Whitebox.getInternalState(originContext, "messageStreams") == Whitebox.getInternalState(clonedContext, "messageStreams")); Assert.assertTrue(Whitebox.getInternalState(originContext, "relNodes") == Whitebox.getInternalState(clonedContext, "relNodes")); Assert.assertNotEquals(originContext.getDataContext(), clonedContext.getDataContext()); validateClonedExecutionContext(originContext.getExecutionContext(), clonedContext.getExecutionContext()); @@ -137,8 +137,7 @@ public class TestQueryTranslator { SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config)); QueryTranslator translator = new QueryTranslator(samzaSqlApplicationConfig); SamzaSqlQueryParser.QueryInfo queryInfo = samzaSqlApplicationConfig.getQueryInfo().get(0); - StreamGraphSpec - graphSpec = new StreamGraphSpec(samzaConfig); + StreamGraphSpec graphSpec = new StreamGraphSpec(samzaConfig); translator.translate(queryInfo, graphSpec); OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java index c029eb4..d7f805d 100644 --- a/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/AppWithGlobalConfigExample.java @@ -31,6 +31,9 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; @@ -51,13 +54,21 @@ public class AppWithGlobalConfigExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { - graph.getInputStream("myPageViewEevent", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class))) - .map(KV::getValue) + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + + KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + + KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor = + trackingSystem.getOutputDescriptor("pageViewEventPerMember", + KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); + + graph.getInputStream(inputStreamDescriptor) .window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), () -> 0, (m, c) -> c + 1, null, null) .setEarlyTrigger(Triggers.repeat(Triggers.count(5))) .setAccumulationMode(AccumulationMode.DISCARDING), "w1") .map(m -> KV.of(m.getKey().getKey(), new PageViewCount(m))) - .sendTo(graph.getOutputStream("pageViewEventPerMemberStream", KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewCount.class)))); + .sendTo(graph.getOutputStream(outputStreamDescriptor)); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java index 9ca4f35..1c1b4be 100644 --- a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java @@ -22,12 +22,15 @@ package org.apache.samza.example; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.operators.MessageStream; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; @@ -50,12 +53,21 @@ public class BroadcastExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { - KVSerde<String, PageViewEvent> pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); - MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("pageViewEventStream", pgeMsgSerde); + KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + KafkaInputDescriptor<KV<String, PageViewEvent>> pageViewEvent = + trackingSystem.getInputDescriptor("pageViewEvent", serde); + KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream1 = + trackingSystem.getOutputDescriptor("outStream1", serde); + KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream2 = + trackingSystem.getOutputDescriptor("outStream2", serde); + KafkaOutputDescriptor<KV<String, PageViewEvent>> outStream3 = + trackingSystem.getOutputDescriptor("outStream3", serde); - inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream("outStream1", pgeMsgSerde)); - inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream("outStream2", pgeMsgSerde)); - inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream("outStream3", pgeMsgSerde)); + MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream(pageViewEvent); + inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream(outStream1)); + inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream(outStream2)); + inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream(outStream3)); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java index 9edaabe..4d3307b 100644 --- a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java @@ -19,6 +19,10 @@ package org.apache.samza.example; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -31,14 +35,12 @@ import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.storage.kv.KeyValueStore; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.task.TaskContext; import org.apache.samza.util.CommandLine; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Example code using {@link KeyValueStore} to implement event-time window @@ -58,12 +60,19 @@ public class KeyValueStoreExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { - MessageStream<PageViewEvent> pageViewEvents = - graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = - graph.getOutputStream("pageViewEventPerMember", + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + + KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + + KafkaOutputDescriptor<KV<String, StatsOutput>> outputStreamDescriptor = + trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class))); + graph.setDefaultSystem(trackingSystem); + MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, StatsOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor); + pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/MergeExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java index ff983a4..33d60d6 100644 --- a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java @@ -20,14 +20,19 @@ package org.apache.samza.example; import com.google.common.collect.ImmutableList; + import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.operators.MessageStream; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; public class MergeExample implements StreamApplication { @@ -45,14 +50,22 @@ public class MergeExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + KVSerde<String, PageViewEvent> serde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); - KVSerde<String, PageViewEvent> - pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class)); + KafkaInputDescriptor<KV<String, PageViewEvent>> isd1 = + trackingSystem.getInputDescriptor("pageViewStream1", serde); + KafkaInputDescriptor<KV<String, PageViewEvent>> isd2 = + trackingSystem.getInputDescriptor("pageViewStream2", serde); + KafkaInputDescriptor<KV<String, PageViewEvent>> isd3 = + trackingSystem.getInputDescriptor("pageViewStream3", serde); - MessageStream.mergeAll(ImmutableList.of(graph.getInputStream("viewStream1", pgeMsgSerde), - graph.getInputStream("viewStream2", pgeMsgSerde), graph.getInputStream("viewStream3", pgeMsgSerde))) - .sendTo(graph.getOutputStream("mergedStream", pgeMsgSerde)); + KafkaOutputDescriptor<KV<String, PageViewEvent>> osd = + trackingSystem.getOutputDescriptor("mergedStream", serde); + MessageStream + .mergeAll(ImmutableList.of(graph.getInputStream(isd1), graph.getInputStream(isd2), graph.getInputStream(isd3))) + .sendTo(graph.getOutputStream(osd)); } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java index 1c0bc25..34b5fc6 100644 --- a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java @@ -18,21 +18,21 @@ */ package org.apache.samza.example; +import java.time.Duration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; -import java.time.Duration; - /** * Simple 2-way stream-to-stream join example @@ -52,21 +52,22 @@ public class OrderShipmentJoinExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { - - MessageStream<OrderRecord> orders = - graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class)); - MessageStream<ShipmentRecord> shipments = - graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class)); - OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders = - graph.getOutputStream("fulfilledOrders", + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + + KafkaInputDescriptor<OrderRecord> orderStreamDescriptor = + trackingSystem.getInputDescriptor("orders", new JsonSerdeV2<>(OrderRecord.class)); + KafkaInputDescriptor<ShipmentRecord> shipmentStreamDescriptor = + trackingSystem.getInputDescriptor("shipments", new JsonSerdeV2<>(ShipmentRecord.class)); + KafkaOutputDescriptor<KV<String, FulfilledOrderRecord>> fulfilledOrdersStreamDescriptor = + trackingSystem.getOutputDescriptor("fulfilledOrders", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class))); - orders - .join(shipments, new MyJoinFunction(), + graph.getInputStream(orderStreamDescriptor) + .join(graph.getInputStream(shipmentStreamDescriptor), new MyJoinFunction(), new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class), Duration.ofMinutes(1), "join") .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder)) - .sendTo(fulfilledOrders); + .sendTo(graph.getOutputStream(fulfilledOrdersStreamDescriptor)); } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java index 2581506..dc5eb74 100644 --- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java @@ -35,6 +35,9 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; @@ -56,13 +59,18 @@ public class PageViewCounterExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); - MessageStream<PageViewEvent> pageViewEvents = null; - pageViewEvents = graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = - graph.getOutputStream("pageViewEventPerMemberStream", + KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + + KafkaOutputDescriptor<KV<String, PageViewCount>> outputStreamDescriptor = + trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class))); + MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream = graph.getOutputStream(outputStreamDescriptor); + SupplierFunction<Integer> initialValue = () -> 0; FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1; pageViewEvents @@ -71,7 +79,6 @@ public class PageViewCounterExample implements StreamApplication { .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow") .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane))) .sendTo(pageViewEventPerMemberStream); - } class PageViewEvent { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java index 7f28346..b776c7d 100644 --- a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java @@ -18,6 +18,7 @@ */ package org.apache.samza.example; +import java.time.Duration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.KV; @@ -30,10 +31,11 @@ import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.JsonSerdeV2; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.StringSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; -import java.time.Duration; - /** * Example {@link StreamApplication} code to test the API methods with re-partition operator @@ -53,18 +55,24 @@ public class RepartitionExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); - MessageStream<PageViewEvent> pageViewEvents = - graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); - OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = - graph.getOutputStream("pageViewEventPerMember", + KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + + KafkaOutputDescriptor<KV<String, MyStreamOutput>> outputStreamDescriptor = + trackingSystem.getOutputDescriptor("pageViewEventPerMember", KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class))); + graph.setDefaultSystem(trackingSystem); + MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(inputStreamDescriptor); + OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember = graph.getOutputStream(outputStreamDescriptor); + pageViewEvents .partitionBy(pve -> pve.memberId, pve -> pve, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy") - .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), - "window") + .window(Windows.keyedTumblingWindow( + KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null), "window") .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane))) .sendTo(pageViewEventPerMember); http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/example/WindowExample.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java index 4950695..cbc1e8e 100644 --- a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java +++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java @@ -19,6 +19,7 @@ package org.apache.samza.example; +import java.time.Duration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; @@ -26,16 +27,17 @@ import org.apache.samza.operators.OutputStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.SupplierFunction; -import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.Windows; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.serializers.IntegerSerde; import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.util.CommandLine; -import java.time.Duration; - /** * Example implementation of a simple user-defined task w/ a window operator. @@ -56,11 +58,18 @@ public class WindowExample implements StreamApplication { @Override public void init(StreamGraph graph, Config config) { + KafkaSystemDescriptor trackingSystem = new KafkaSystemDescriptor("tracking"); + + KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor = + trackingSystem.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)); + KafkaOutputDescriptor<Integer> outputStreamDescriptor = + trackingSystem.getOutputDescriptor("pageViewEventPerMember", new IntegerSerde()); + + MessageStream<PageViewEvent> inputStream = graph.getInputStream(inputStreamDescriptor); + OutputStream<Integer> outputStream = graph.getOutputStream(outputStreamDescriptor); SupplierFunction<Integer> initialValue = () -> 0; FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1; - MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>()); - OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde()); // create a tumbling window that outputs the number of message collected every 10 minutes. // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 3c45967..5ca497a 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -138,7 +138,7 @@ public class TestRunner { */ private void registerSystem(String systemName) { if (!systems.containsKey(systemName)) { - systems.put(systemName, CollectionStreamSystemSpec.create(systemName)); + systems.put(systemName, CollectionStreamSystemSpec.create(systemName, JOB_NAME)); configs.putAll(systems.get(systemName).getSystemConfigs()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java index c005c41..5658f61 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java @@ -22,6 +22,7 @@ package org.apache.samza.test.framework.system; import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.system.inmemory.InMemorySystemFactory; @@ -32,11 +33,17 @@ import org.apache.samza.system.inmemory.InMemorySystemFactory; * Following system level configs are set by default * <ol> * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li> + * <li>"jobs.job-name.systems.%s.default.stream.samza.offset.default" = "oldest"</li> * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> + * <li>"jobs.job-name.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li> * </ol> - * + * The "systems.*" configs are required since the planner uses the system to get metadata about streams during + * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided + * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*" + * scope have the highest precedence. */ public class CollectionStreamSystemSpec { + private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; // prefix to override configs generated by the planner private static final String SYSTEM_FACTORY = "systems.%s.samza.factory"; private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default"; @@ -51,11 +58,13 @@ public class CollectionStreamSystemSpec { * <p> * @param systemName represents unique name of the system */ - private CollectionStreamSystemSpec(String systemName) { + private CollectionStreamSystemSpec(String systemName, String jobName) { this.systemName = systemName; systemConfigs = new HashMap<String, String>(); systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName()); + systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_FACTORY, jobName, systemName), InMemorySystemFactory.class.getName()); systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest"); + systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_OFFSET, jobName, systemName), "oldest"); } public String getSystemName() { @@ -67,13 +76,15 @@ public class CollectionStreamSystemSpec { } /** - * Creates a {@link CollectionStreamSystemSpec} with name {@code name} - * @param name represents name of the {@link CollectionStreamSystemSpec} + * Creates a {@link CollectionStreamSystemSpec} with name {@code systemName} + * @param systemName represents name of the {@link CollectionStreamSystemSpec} + * @param jobName name of the job * @return an instance of {@link CollectionStreamSystemSpec} */ - public static CollectionStreamSystemSpec create(String name) { - Preconditions.checkState(name != null); - return new CollectionStreamSystemSpec(name); + public static CollectionStreamSystemSpec create(String systemName, String jobName) { + Preconditions.checkState(StringUtils.isNotBlank(systemName)); + Preconditions.checkState(StringUtils.isNotBlank(jobName)); + return new CollectionStreamSystemSpec(systemName, jobName); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java index f6e3d5f..af20fd7 100644 --- a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java +++ b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java @@ -20,7 +20,13 @@ package org.apache.samza.test.integration; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; +import org.apache.samza.operators.KV; import org.apache.samza.operators.StreamGraph; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaOutputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,14 +35,21 @@ import org.slf4j.LoggerFactory; * Acts as a pass through filter for all the events from a input stream. */ public class TestStandaloneIntegrationApplication implements StreamApplication { - private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class); @Override public void init(StreamGraph graph, Config config) { - String inputStream = config.get("input.stream.name"); + String systemName = "testSystemName"; + String inputStreamName = config.get("input.stream.name"); String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic"; - LOGGER.info("Publishing message to: {}.", outputStreamName); - graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName)); + LOGGER.info("Publishing message from: {} to: {}.", inputStreamName, outputStreamName); + KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor(systemName); + + KVSerde<Object, Object> noOpSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()); + KafkaInputDescriptor<KV<Object, Object>> isd = + kafkaSystemDescriptor.getInputDescriptor(inputStreamName, noOpSerde); + KafkaOutputDescriptor<KV<Object, Object>> osd = + kafkaSystemDescriptor.getOutputDescriptor(inputStreamName, noOpSerde); + graph.getInputStream(isd).sendTo(graph.getOutputStream(osd)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java index 3301af8..b86c6af 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java @@ -31,8 +31,12 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.operators.KV; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.runtime.LocalApplicationRunner; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; import org.apache.samza.test.controlmessages.TestData.PageView; @@ -40,8 +44,8 @@ import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory; import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.ArraySystemFactory; import org.apache.samza.test.util.Base64Serializer; - import org.junit.Test; + import static org.junit.Assert.assertEquals; /** @@ -92,7 +96,10 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness { final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs)); final StreamApplication app = (streamGraph, cfg) -> { - streamGraph.<KV<String, PageView>>getInputStream("PageView") + DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<KV<String, PageView>> isd = + sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + streamGraph.getInputStream(isd) .map(Values.create()) .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") .sink((m, collector, coordinator) -> { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java index d4dc4ed..5336595 100644 --- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java @@ -19,6 +19,8 @@ package org.apache.samza.test.controlmessages; +import scala.collection.JavaConverters; + import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; @@ -38,6 +40,8 @@ import org.apache.samza.container.TaskName; import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.operators.KV; +import org.apache.samza.operators.descriptors.GenericInputDescriptor; +import org.apache.samza.operators.descriptors.DelegatingSystemDescriptor; import org.apache.samza.operators.impl.InputOperatorImpl; import org.apache.samza.operators.impl.OperatorImpl; import org.apache.samza.operators.impl.OperatorImplGraph; @@ -48,6 +52,8 @@ import org.apache.samza.processor.TestStreamProcessorUtil; import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.runtime.TestLocalApplicationRunner; import org.apache.samza.serializers.IntegerSerdeFactory; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.StringSerdeFactory; import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory; import org.apache.samza.standalone.PassthroughJobCoordinatorFactory; @@ -66,7 +72,6 @@ import org.apache.samza.test.harness.AbstractIntegrationTestHarness; import org.apache.samza.test.util.SimpleSystemAdmin; import org.apache.samza.test.util.TestStreamConsumer; import org.junit.Test; -import scala.collection.JavaConverters; import static org.junit.Assert.assertEquals; @@ -143,7 +148,10 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness { List<PageView> received = new ArrayList<>(); final StreamApplication app = (streamGraph, cfg) -> { - streamGraph.<KV<String, PageView>>getInputStream("PageView") + DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor("test"); + GenericInputDescriptor<KV<String, PageView>> isd = + sd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + streamGraph.getInputStream(isd) .map(EndOfStreamIntegrationTest.Values.create()) .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") .sink((m, collector, coordinator) -> { http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java index ec52aa4..aca6c40 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/BroadcastAssertApp.java @@ -19,17 +19,18 @@ package org.apache.samza.test.framework; +import java.util.Arrays; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.serializers.JsonSerdeV2; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.operator.data.PageView; -import java.util.Arrays; - public class BroadcastAssertApp implements StreamApplication { - + public static final String SYSTEM = "kafka"; public static final String INPUT_TOPIC_NAME_PROP = "inputTopicName"; @@ -38,8 +39,10 @@ public class BroadcastAssertApp implements StreamApplication { String inputTopic = config.get(INPUT_TOPIC_NAME_PROP); final JsonSerdeV2<PageView> serde = new JsonSerdeV2<>(PageView.class); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor(SYSTEM); + KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(inputTopic, serde); final MessageStream<PageView> broadcastPageViews = graph - .getInputStream(inputTopic, serde) + .getInputStream(isd) .broadcast(serde, "pv"); /** http://git-wip-us.apache.org/repos/asf/samza/blob/2a71baf7/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java index ba4c985..6fdf887 100644 --- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java +++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java @@ -27,27 +27,40 @@ import java.util.Random; import org.apache.samza.SamzaException; import org.apache.samza.application.StreamApplication; import org.apache.samza.operators.KV; +import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.kafka.KafkaInputDescriptor; +import org.apache.samza.system.kafka.KafkaSystemDescriptor; import org.apache.samza.test.controlmessages.TestData; import org.apache.samza.test.framework.stream.CollectionStream; -import static org.apache.samza.test.controlmessages.TestData.PageView; import org.junit.Assert; import org.junit.Test; +import static org.apache.samza.test.controlmessages.TestData.PageView; + public class StreamApplicationIntegrationTest { final StreamApplication pageViewFilter = (streamGraph, cfg) -> { - streamGraph.<KV<String, TestData.PageView>>getInputStream("PageView").map( - StreamApplicationIntegrationTest.Values.create()).filter(pv -> pv.getPageKey().equals("inbox")); + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); + KafkaInputDescriptor<KV<String, PageView>> isd = + ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + MessageStream<KV<String, TestData.PageView>> inputStream = streamGraph.getInputStream(isd); + inputStream.map(StreamApplicationIntegrationTest.Values.create()).filter(pv -> pv.getPageKey().equals("inbox")); }; - final StreamApplication pageViewParition = (streamGraph, cfg) -> { - streamGraph.<KV<String, PageView>>getInputStream("PageView") + final StreamApplication pageViewRepartition = (streamGraph, cfg) -> { + KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("test"); + KafkaInputDescriptor<KV<String, PageView>> isd = + ksd.getInputDescriptor("PageView", KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); + MessageStream<KV<String, TestData.PageView>> inputStream = streamGraph.getInputStream(isd); + inputStream .map(Values.create()) - .partitionBy(pv -> pv.getMemberId(), pv -> pv, "p1") + .partitionBy(PageView::getMemberId, pv -> pv, "p1") .sink((m, collector, coordinator) -> { collector.send(new OutgoingMessageEnvelope(new SystemStream("test", "Output"), m.getKey(), m.getKey(), @@ -74,7 +87,7 @@ public class StreamApplicationIntegrationTest { CollectionStream output = CollectionStream.empty("test", "Output", 10); TestRunner - .of(pageViewParition) + .of(pageViewRepartition) .addInputStream(input) .addOutputStream(output) .addOverrideConfig("job.default.system", "test") @@ -99,7 +112,7 @@ public class StreamApplicationIntegrationTest { CollectionStream output = CollectionStream.empty("test", "Output", 10); TestRunner - .of(pageViewParition) + .of(pageViewRepartition) .addInputStream(input) .addOutputStream(output) .run(Duration.ofMillis(1000));