KAFKA-3136: Rename KafkaStreaming to KafkaStreams Author: Guozhang Wang <[email protected]>
Reviewers: Gwen Shapira Closes #800 from guozhangwang/KRename Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21c6cfe5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21c6cfe5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21c6cfe5 Branch: refs/heads/trunk Commit: 21c6cfe50dbe818a392c28f48ce8891f7f99aaf6 Parents: 91ba074 Author: Guozhang Wang <[email protected]> Authored: Fri Jan 22 13:00:00 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Fri Jan 22 13:00:00 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/streams/KafkaStreaming.java | 167 ------ .../org/apache/kafka/streams/KafkaStreams.java | 171 +++++++ .../java/org/apache/kafka/streams/KeyValue.java | 37 ++ .../apache/kafka/streams/StreamingConfig.java | 314 ------------ .../apache/kafka/streams/StreamingMetrics.java | 27 - .../org/apache/kafka/streams/StreamsConfig.java | 303 +++++++++++ .../apache/kafka/streams/StreamsMetrics.java | 27 + .../kafka/streams/examples/KStreamJob.java | 24 +- .../kafka/streams/examples/ProcessorJob.java | 34 +- .../apache/kafka/streams/kstream/KStream.java | 1 + .../apache/kafka/streams/kstream/KTable.java | 1 + .../apache/kafka/streams/kstream/KeyValue.java | 34 -- .../kstream/internals/KStreamAggregate.java | 2 +- .../kstream/internals/KStreamFlatMap.java | 2 +- .../streams/kstream/internals/KStreamImpl.java | 10 +- .../kstream/internals/KStreamKStreamJoin.java | 2 +- .../streams/kstream/internals/KStreamMap.java | 2 +- .../kstream/internals/KStreamReduce.java | 2 +- .../kstream/internals/KStreamTransform.java | 2 +- .../streams/kstream/internals/KTableImpl.java | 2 +- .../kstream/internals/KTableRepartitionMap.java | 2 +- .../internals/WindowedStreamPartitioner.java | 52 -- .../internals/WindowedStreamsPartitioner.java | 52 ++ .../streams/processor/ProcessorContext.java | 6 +- .../streams/processor/StreamPartitioner.java | 59 --- .../streams/processor/StreamsPartitioner.java | 59 +++ .../streams/processor/TopologyBuilder.java | 76 +-- .../processor/internals/AbstractTask.java | 6 +- .../KafkaStreamingPartitionAssignor.java | 483 ------------------ .../internals/ProcessorContextImpl.java | 12 +- .../processor/internals/RecordCollector.java | 4 +- .../streams/processor/internals/SinkNode.java | 6 +- .../processor/internals/StandbyContextImpl.java | 12 +- .../processor/internals/StandbyTask.java | 12 +- .../internals/StreamPartitionAssignor.java | 483 ++++++++++++++++++ .../streams/processor/internals/StreamTask.java | 16 +- .../processor/internals/StreamThread.java | 48 +- .../org/apache/kafka/streams/state/Entry.java | 42 -- .../kafka/streams/state/KeyValueIterator.java | 4 +- .../kafka/streams/state/KeyValueStore.java | 3 +- .../streams/state/WindowStoreIterator.java | 2 +- .../InMemoryKeyValueStoreSupplier.java | 12 +- .../InMemoryLRUCacheStoreSupplier.java | 12 +- .../state/internals/MeteredKeyValueStore.java | 14 +- .../state/internals/MeteredWindowStore.java | 6 +- .../streams/state/internals/RocksDBStore.java | 16 +- .../state/internals/RocksDBWindowStore.java | 9 +- .../kafka/streams/StreamingConfigTest.java | 75 --- .../apache/kafka/streams/StreamsConfigTest.java | 76 +++ .../kstream/internals/KStreamFlatMapTest.java | 2 +- .../internals/KStreamKTableLeftJoinTest.java | 2 +- .../kstream/internals/KStreamMapTest.java | 2 +- .../kstream/internals/KStreamTransformTest.java | 2 +- .../kstream/internals/KTableKTableJoinTest.java | 2 +- .../internals/KTableKTableLeftJoinTest.java | 2 +- .../internals/KTableKTableOuterJoinTest.java | 2 +- .../WindowedStreamPartitionerTest.java | 84 --- .../WindowedStreamsPartitionerTest.java | 84 +++ .../KafkaStreamingPartitionAssignorTest.java | 508 ------------------ .../internals/ProcessorTopologyTest.java | 27 +- .../processor/internals/StandbyTaskTest.java | 31 +- .../internals/StreamPartitionAssignorTest.java | 509 +++++++++++++++++++ .../processor/internals/StreamTaskTest.java | 27 +- .../processor/internals/StreamThreadTest.java | 37 +- .../streams/state/KeyValueStoreTestDriver.java | 45 +- .../internals/AbstractKeyValueStoreTest.java | 22 +- .../state/internals/RocksDBWindowStoreTest.java | 38 +- .../apache/kafka/test/KStreamTestDriver.java | 4 +- .../apache/kafka/test/MockProcessorContext.java | 14 +- .../apache/kafka/test/NoOpKeyValueMapper.java | 2 +- .../kafka/test/ProcessorTopologyTestDriver.java | 32 +- 71 files changed, 2133 insertions(+), 2168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java deleted file mode 100644 index 0d99739..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams; - -import org.apache.kafka.common.metrics.JmxReporter; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.processor.internals.StreamThread; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and - * sends output to zero or more output topics. - * <p> - * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify - * the transformation. - * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and - * start one or more of these processors to process the Kafka partitions assigned to this particular instance. - * <p> - * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes - * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being - * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or - * started in the appropriate processes to balance processing load. - * <p> - * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} - * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. - * <p> - * A simple example might look like this: - * <pre> - * Map<String, Object> props = new HashMap<>(); - * props.put("bootstrap.servers", "localhost:4242"); - * props.put("key.deserializer", StringDeserializer.class); - * props.put("value.deserializer", StringDeserializer.class); - * props.put("key.serializer", StringSerializer.class); - * props.put("value.serializer", IntegerSerializer.class); - * props.put("timestamp.extractor", MyTimestampExtractor.class); - * StreamingConfig config = new StreamingConfig(props); - * - * KStreamBuilder builder = new KStreamBuilder(); - * builder.from("topic1").mapValue(value -> value.length()).to("topic2"); - * - * KafkaStreaming streaming = new KafkaStreaming(builder, config); - * streaming.start(); - * </pre> - * - */ -public class KafkaStreaming { - - private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class); - private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1); - private static final String JMX_PREFIX = "kafka.streams"; - - // container states - private static final int CREATED = 0; - private static final int RUNNING = 1; - private static final int STOPPED = 2; - private int state = CREATED; - - private final StreamThread[] threads; - - // processId is expected to be unique across JVMs and to be used - // in userData of the subscription request to allow assignor be aware - // of the co-location of stream thread's consumers. It is for internal - // usage only and should not be exposed to users at all. - private final UUID processId; - - public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception { - // create the metrics - Time time = new SystemTime(); - - this.processId = UUID.randomUUID(); - - String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG); - if (jobId.length() <= 0) - jobId = "kafka-streams"; - - String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG); - if (clientId.length() <= 0) - clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement(); - - List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG, - MetricsReporter.class); - reporters.add(new JmxReporter(JMX_PREFIX)); - - MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG)) - .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), - TimeUnit.MILLISECONDS); - - Metrics metrics = new Metrics(metricConfig, reporters, time); - - this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)]; - for (int i = 0; i < this.threads.length; i++) { - this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time); - } - } - - /** - * Start the stream process by starting all its threads - */ - public synchronized void start() { - log.debug("Starting Kafka Stream process"); - - if (state == CREATED) { - for (StreamThread thread : threads) - thread.start(); - - state = RUNNING; - - log.info("Started Kafka Stream process"); - } else { - throw new IllegalStateException("This process was already started."); - } - } - - /** - * Shutdown this stream process by signaling the threads to stop, - * wait for them to join and clean up the process instance. - */ - public synchronized void close() { - log.debug("Stopping Kafka Stream process"); - - if (state == RUNNING) { - // signal the threads to stop and wait - for (StreamThread thread : threads) - thread.close(); - - for (StreamThread thread : threads) { - try { - thread.join(); - } catch (InterruptedException ex) { - Thread.interrupted(); - } - } - - state = STOPPED; - - log.info("Stopped Kafka Stream process"); - } else { - throw new IllegalStateException("This process has not started yet."); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java new file mode 100644 index 0000000..071cef6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and + * sends output to zero or more output topics. + * <p> + * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify + * the transformation. + * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and + * start one or more of these processors to process the Kafka partitions assigned to this particular instance. + * <p> + * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes + * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being + * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or + * started in the appropriate processes to balance processing load. + * <p> + * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} + * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. + * <p> + * A simple example might look like this: + * <pre> + * Map<String, Object> props = new HashMap<>(); + * props.put("bootstrap.servers", "localhost:4242"); + * props.put("key.deserializer", StringDeserializer.class); + * props.put("value.deserializer", StringDeserializer.class); + * props.put("key.serializer", StringSerializer.class); + * props.put("value.serializer", IntegerSerializer.class); + * props.put("timestamp.extractor", MyTimestampExtractor.class); + * StreamsConfig config = new StreamsConfig(props); + * + * KStreamBuilder builder = new KStreamBuilder(); + * builder.from("topic1").mapValue(value -> value.length()).to("topic2"); + * + * KafkaStreams streams = new KafkaStreams(builder, config); + * streams.start(); + * </pre> + * + */ +public class KafkaStreams { + + private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class); + private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1); + private static final String JMX_PREFIX = "kafka.streams"; + + // container states + private static final int CREATED = 0; + private static final int RUNNING = 1; + private static final int STOPPED = 2; + private int state = CREATED; + + private final StreamThread[] threads; + + // processId is expected to be unique across JVMs and to be used + // in userData of the subscription request to allow assignor be aware + // of the co-location of stream thread's consumers. It is for internal + // usage only and should not be exposed to users at all. + private final UUID processId; + + public KafkaStreams(TopologyBuilder builder, Properties props) { + this(builder, new StreamsConfig(props)); + } + + public KafkaStreams(TopologyBuilder builder, StreamsConfig config) { + // create the metrics + Time time = new SystemTime(); + + this.processId = UUID.randomUUID(); + + // JobId is a required config and hence should always have value + String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG); + + String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); + if (clientId.length() <= 0) + clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement(); + + List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, + MetricsReporter.class); + reporters.add(new JmxReporter(JMX_PREFIX)); + + MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG)) + .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), + TimeUnit.MILLISECONDS); + + Metrics metrics = new Metrics(metricConfig, reporters, time); + + this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; + for (int i = 0; i < this.threads.length; i++) { + this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time); + } + } + + /** + * Start the stream process by starting all its threads + */ + public synchronized void start() { + log.debug("Starting Kafka Stream process"); + + if (state == CREATED) { + for (StreamThread thread : threads) + thread.start(); + + state = RUNNING; + + log.info("Started Kafka Stream process"); + } else { + throw new IllegalStateException("This process was already started."); + } + } + + /** + * Shutdown this stream process by signaling the threads to stop, + * wait for them to join and clean up the process instance. + */ + public synchronized void close() { + log.debug("Stopping Kafka Stream process"); + + if (state == RUNNING) { + // signal the threads to stop and wait + for (StreamThread thread : threads) + thread.close(); + + for (StreamThread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } + + state = STOPPED; + + log.info("Stopped Kafka Stream process"); + } else { + throw new IllegalStateException("This process has not started yet."); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java new file mode 100644 index 0000000..472e677 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +public class KeyValue<K, V> { + + public final K key; + public final V value; + + public KeyValue(K key, V value) { + this.key = key; + this.value = value; + } + + public static <K, V> KeyValue<K, V> pair(K key, V value) { + return new KeyValue<>(key, value); + } + + public String toString() { + return "KeyValue(" + key + ", " + value + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java deleted file mode 100644 index e89d030..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java +++ /dev/null @@ -1,314 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.DefaultPartitionGrouper; -import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor; -import org.apache.kafka.streams.processor.internals.StreamThread; - -import java.util.Map; - -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; - -public class StreamingConfig extends AbstractConfig { - - private static final ConfigDef CONFIG; - - /** <code>state.dir</code> */ - public static final String STATE_DIR_CONFIG = "state.dir"; - private static final String STATE_DIR_DOC = "Directory location for state store."; - - /** <code>zookeeper.connect<code/> */ - public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; - private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; - - /** <code>commit.interval.ms</code> */ - public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; - private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; - - /** <code>poll.ms</code> */ - public static final String POLL_MS_CONFIG = "poll.ms"; - private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; - - /** <code>num.stream.threads</code> */ - public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; - private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; - - /** <code>num.stream.threads</code> */ - public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; - private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; - - /** <code>buffered.records.per.partition</code> */ - public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; - private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; - - /** <code>state.cleanup.delay</code> */ - public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; - private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated."; - - /** <code>total.records.to.process</code> */ - public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process"; - private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records."; - - /** <code>window.time.ms</code> */ - public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms"; - private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called " - + "with this frequency even if there is no message."; - - /** <code>timestamp.extractor</code> */ - public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; - private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; - - /** <code>partition.grouper</code> */ - public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; - private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; - - /** <code>job.id</code> */ - public static final String JOB_ID_CONFIG = "job.id"; - public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; - - /** <code>key.serializer</code> */ - public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; - - /** <code>value.serializer</code> */ - public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; - - /** <code>key.deserializer</code> */ - public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; - - /** <code>value.deserializer</code> */ - public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; - - /** <code>metrics.sample.window.ms</code> */ - public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; - - /** <code>metrics.num.samples</code> */ - public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; - - /** <code>metric.reporters</code> */ - public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; - - /** <code>bootstrap.servers</code> */ - public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; - - /** <code>client.id</code> */ - public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; - - private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); - - static { - CONFIG = new ConfigDef().define(JOB_ID_CONFIG, - Type.STRING, - "", - Importance.MEDIUM, - StreamingConfig.JOB_ID_DOC) - .define(CLIENT_ID_CONFIG, - Type.STRING, - "", - Importance.MEDIUM, - CommonClientConfigs.CLIENT_ID_DOC) - .define(ZOOKEEPER_CONNECT_CONFIG, - Type.STRING, - "", - Importance.HIGH, - StreamingConfig.ZOOKEEPER_CONNECT_DOC) - .define(STATE_DIR_CONFIG, - Type.STRING, - SYSTEM_TEMP_DIRECTORY, - Importance.MEDIUM, - STATE_DIR_DOC) - .define(COMMIT_INTERVAL_MS_CONFIG, - Type.LONG, - 30000, - Importance.HIGH, - COMMIT_INTERVAL_MS_DOC) - .define(POLL_MS_CONFIG, - Type.LONG, - 100, - Importance.LOW, - POLL_MS_DOC) - .define(NUM_STREAM_THREADS_CONFIG, - Type.INT, - 1, - Importance.LOW, - NUM_STREAM_THREADS_DOC) - .define(NUM_STANDBY_REPLICAS_CONFIG, - Type.INT, - 0, - Importance.LOW, - NUM_STANDBY_REPLICAS_DOC) - .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, - Type.INT, - 1000, - Importance.LOW, - BUFFERED_RECORDS_PER_PARTITION_DOC) - .define(STATE_CLEANUP_DELAY_MS_CONFIG, - Type.LONG, - 60000, - Importance.LOW, - STATE_CLEANUP_DELAY_MS_DOC) - .define(TOTAL_RECORDS_TO_PROCESS, - Type.LONG, - -1L, - Importance.LOW, - TOTAL_RECORDS_TO_DOC) - .define(WINDOW_TIME_MS_CONFIG, - Type.LONG, - -1L, - Importance.MEDIUM, - WINDOW_TIME_MS_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, - Type.CLASS, - Importance.HIGH, - ProducerConfig.KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, - Type.CLASS, - Importance.HIGH, - ProducerConfig.VALUE_SERIALIZER_CLASS_DOC) - .define(KEY_DESERIALIZER_CLASS_CONFIG, - Type.CLASS, - Importance.HIGH, - ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC) - .define(VALUE_DESERIALIZER_CLASS_CONFIG, - Type.CLASS, - Importance.HIGH, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC) - .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - Type.CLASS, - Importance.HIGH, - TIMESTAMP_EXTRACTOR_CLASS_DOC) - .define(PARTITION_GROUPER_CLASS_CONFIG, - Type.CLASS, - DefaultPartitionGrouper.class, - Importance.HIGH, - PARTITION_GROUPER_CLASS_DOC) - .define(BOOTSTRAP_SERVERS_CONFIG, - Type.STRING, - Importance.HIGH, - CommonClientConfigs.BOOSTRAP_SERVERS_DOC) - .define(METRIC_REPORTER_CLASSES_CONFIG, - Type.LIST, - "", - Importance.LOW, - CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) - .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, - Type.LONG, - 30000, - atLeast(0), - Importance.LOW, - CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) - .define(METRICS_NUM_SAMPLES_CONFIG, - Type.INT, - 2, - atLeast(1), - Importance.LOW, - CommonClientConfigs.METRICS_NUM_SAMPLES_DOC); - } - - public static class InternalConfig { - public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__"; - } - - public StreamingConfig(Map<?, ?> props) { - super(CONFIG, props); - } - - public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) { - Map<String, Object> props = getBaseConsumerConfigs(); - - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); - props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG)); - props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName()); - - props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); - - return props; - } - - public Map<String, Object> getRestoreConsumerConfigs(String clientId) { - Map<String, Object> props = getBaseConsumerConfigs(); - - // no need to set group id for a restore consumer - props.remove(ConsumerConfig.GROUP_ID_CONFIG); - - props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); - - return props; - } - - private Map<String, Object> getBaseConsumerConfigs() { - Map<String, Object> props = this.originals(); - - // set consumer default property values - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - // remove properties that are not required for consumers - props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG); - props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG); - props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); - props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG); - - return props; - } - - public Map<String, Object> getProducerConfigs(String clientId) { - Map<String, Object> props = this.originals(); - - // set producer default property values - props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); - - // remove properties that are not required for producers - props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG); - props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG); - props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); - - props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); - - return props; - } - - public Serializer keySerializer() { - return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); - } - - public Serializer valueSerializer() { - return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); - } - - public Deserializer keyDeserializer() { - return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - } - - public Deserializer valueDeserializer() { - return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - } - - public static void main(String[] args) { - System.out.println(CONFIG.toHtmlTable()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java deleted file mode 100644 index ebf80b3..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams; - -import org.apache.kafka.common.metrics.Sensor; - -public interface StreamingMetrics { - - Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags); - - void recordLatency(Sensor sensor, long startNs, long endNs); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java new file mode 100644 index 0000000..3843b1d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.DefaultPartitionGrouper; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; + +public class StreamsConfig extends AbstractConfig { + + private static final ConfigDef CONFIG; + + /** <code>state.dir</code> */ + public static final String STATE_DIR_CONFIG = "state.dir"; + private static final String STATE_DIR_DOC = "Directory location for state store."; + + /** <code>zookeeper.connect<code/> */ + public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; + private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management."; + + /** <code>commit.interval.ms</code> */ + public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; + private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; + + /** <code>poll.ms</code> */ + public static final String POLL_MS_CONFIG = "poll.ms"; + private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; + + /** <code>num.stream.threads</code> */ + public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; + private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; + + /** <code>num.stream.threads</code> */ + public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas"; + private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task."; + + /** <code>buffered.records.per.partition</code> */ + public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; + private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; + + /** <code>state.cleanup.delay</code> */ + public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; + private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated."; + + /** <code>total.records.to.process</code> */ + public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process"; + private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records."; + + /** <code>timestamp.extractor</code> */ + public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; + private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + + /** <code>partition.grouper</code> */ + public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; + private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; + + /** <code>job.id</code> */ + public static final String JOB_ID_CONFIG = "job.id"; + public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; + + /** <code>key.serializer</code> */ + public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; + + /** <code>value.serializer</code> */ + public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + + /** <code>key.deserializer</code> */ + public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; + + /** <code>value.deserializer</code> */ + public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + + /** <code>metrics.sample.window.ms</code> */ + public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; + + /** <code>metrics.num.samples</code> */ + public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG; + + /** <code>metric.reporters</code> */ + public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG; + + /** <code>bootstrap.servers</code> */ + public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + + /** <code>client.id</code> */ + public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + + private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); + + static { + CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value + Type.STRING, + Importance.HIGH, + StreamsConfig.JOB_ID_DOC) + .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value + Type.STRING, + Importance.HIGH, + CommonClientConfigs.BOOSTRAP_SERVERS_DOC) + .define(CLIENT_ID_CONFIG, + Type.STRING, + "", + Importance.HIGH, + CommonClientConfigs.CLIENT_ID_DOC) + .define(ZOOKEEPER_CONNECT_CONFIG, + Type.STRING, + "", + Importance.HIGH, + StreamsConfig.ZOOKEEPER_CONNECT_DOC) + .define(STATE_DIR_CONFIG, + Type.STRING, + SYSTEM_TEMP_DIRECTORY, + Importance.HIGH, + STATE_DIR_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value + Type.CLASS, + Importance.HIGH, + ProducerConfig.KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, // required with no default value + Type.CLASS, + Importance.HIGH, + ProducerConfig.VALUE_SERIALIZER_CLASS_DOC) + .define(KEY_DESERIALIZER_CLASS_CONFIG, // required with no default value + Type.CLASS, + Importance.HIGH, + ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, // required with no default value + Type.CLASS, + Importance.HIGH, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC) + .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + Importance.MEDIUM, + TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(PARTITION_GROUPER_CLASS_CONFIG, + Type.CLASS, + DefaultPartitionGrouper.class, + Importance.MEDIUM, + PARTITION_GROUPER_CLASS_DOC) + .define(COMMIT_INTERVAL_MS_CONFIG, + Type.LONG, + 30000, + Importance.LOW, + COMMIT_INTERVAL_MS_DOC) + .define(POLL_MS_CONFIG, + Type.LONG, + 100, + Importance.LOW, + POLL_MS_DOC) + .define(NUM_STREAM_THREADS_CONFIG, + Type.INT, + 1, + Importance.LOW, + NUM_STREAM_THREADS_DOC) + .define(NUM_STANDBY_REPLICAS_CONFIG, + Type.INT, + 0, + Importance.LOW, + NUM_STANDBY_REPLICAS_DOC) + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + 1000, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) + .define(STATE_CLEANUP_DELAY_MS_CONFIG, + Type.LONG, + 60000, + Importance.LOW, + STATE_CLEANUP_DELAY_MS_DOC) + .define(TOTAL_RECORDS_TO_PROCESS, + Type.LONG, + -1L, + Importance.LOW, + TOTAL_RECORDS_TO_DOC) + .define(METRIC_REPORTER_CLASSES_CONFIG, + Type.LIST, + "", + Importance.LOW, + CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) + .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, + Type.LONG, + 30000, + atLeast(0), + Importance.LOW, + CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC) + .define(METRICS_NUM_SAMPLES_CONFIG, + Type.INT, + 2, + atLeast(1), + Importance.LOW, + CommonClientConfigs.METRICS_NUM_SAMPLES_DOC); + } + + public static class InternalConfig { + public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__"; + } + + public StreamsConfig(Map<?, ?> props) { + super(CONFIG, props); + } + + public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) { + Map<String, Object> props = getBaseConsumerConfigs(); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer"); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName()); + + props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread); + + return props; + } + + public Map<String, Object> getRestoreConsumerConfigs(String clientId) { + Map<String, Object> props = getBaseConsumerConfigs(); + + // no need to set group id for a restore consumer + props.remove(ConsumerConfig.GROUP_ID_CONFIG); + + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer"); + + return props; + } + + private Map<String, Object> getBaseConsumerConfigs() { + Map<String, Object> props = this.originals(); + + // set consumer default property values + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + // remove properties that are not required for consumers + props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG); + props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG); + props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); + + return props; + } + + public Map<String, Object> getProducerConfigs(String clientId) { + Map<String, Object> props = this.originals(); + + // set producer default property values + props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + + // remove properties that are not required for producers + props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG); + props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + + props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); + + return props; + } + + public Serializer keySerializer() { + return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); + } + + public Serializer valueSerializer() { + return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); + } + + public Deserializer keyDeserializer() { + return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + } + + public Deserializer valueDeserializer() { + return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + } + + public static void main(String[] args) { + System.out.println(CONFIG.toHtmlTable()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java new file mode 100644 index 0000000..a151392 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.common.metrics.Sensor; + +public interface StreamsMetrics { + + Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags); + + void recordLatency(Sensor sensor, long startNs, long endNs); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java index 88a8955..a234395 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java @@ -20,11 +20,11 @@ package org.apache.kafka.streams.examples; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.KafkaStreaming; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; @@ -34,14 +34,14 @@ public class KStreamJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream"); - props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamingConfig config = new StreamingConfig(props); + props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); + StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); @@ -78,7 +78,7 @@ public class KStreamJob { streams[0].to("topic2"); streams[1].to("topic3"); - KafkaStreaming kstream = new KafkaStreaming(builder, config); + KafkaStreams kstream = new KafkaStreams(builder, config); kstream.start(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java index 2d0b79f..e17c16b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -21,13 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreaming; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; @@ -70,11 +70,11 @@ public class ProcessorJob { KeyValueIterator<String, Integer> iter = this.kvStore.all(); while (iter.hasNext()) { - Entry<String, Integer> entry = iter.next(); + KeyValue<String, Integer> entry = iter.next(); - System.out.println("[" + entry.key() + ", " + entry.value() + "]"); + System.out.println("[" + entry.key + ", " + entry.value + "]"); - context.forward(entry.key(), entry.value()); + context.forward(entry.key, entry.value); } iter.close(); @@ -90,15 +90,15 @@ public class ProcessorJob { public static void main(String[] args) throws Exception { Properties props = new Properties(); - props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor"); - props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamingConfig config = new StreamingConfig(props); + props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); + StreamsConfig config = new StreamsConfig(props); TopologyBuilder builder = new TopologyBuilder(); @@ -109,7 +109,7 @@ public class ProcessorJob { builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); - KafkaStreaming streaming = new KafkaStreaming(builder, config); - streaming.start(); + KafkaStreams streams = new KafkaStreams(builder, config); + streams.start(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index dfed661..26f04f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 87298d1..feb28ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; /** * KTable is an abstraction of a change log stream. http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java deleted file mode 100644 index f633f6e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public class KeyValue<K, V> { - - public final K key; - public final V value; - - public KeyValue(K key, V value) { - this.key = key; - this.value = value; - } - - public static <K, V> KeyValue<K, V> pair(K key, V value) { - return new KeyValue<>(key, value); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 91bfa9e..26002f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java index daef8b1..ff7f9ed 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index ce89220..98e50c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.StreamsPartitioner; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; @@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) { String name = topology.newName(SINK_NAME); - StreamPartitioner<K, V> streamPartitioner = null; + StreamsPartitioner<K, V> streamsPartitioner = null; if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; - streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); + streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer); } - topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name); + topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 01e3325..a4ac9b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java index 57f1431..a40449b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.ProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java index 7d6eb27..c484c7b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java index a9d8f97..4299c66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.Processor; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 8ee557c..d046090 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Reducer; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index 12fcc17..499f721 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -18,7 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java deleted file mode 100644 index 10e69cc..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.StreamPartitioner; - -public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { - - private final WindowedSerializer<K> serializer; - - public WindowedStreamPartitioner(WindowedSerializer<K> serializer) { - this.serializer = serializer; - } - - /** - * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value - * and the current number of partitions. The partition number id determined by the original key of the windowed key - * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. - * - * @param windowedKey the key of the message - * @param value the value of the message - * @param numPartitions the total number of partitions - * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used - */ - public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { - byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); - - // hash the keyBytes to choose a partition - return toPositive(Utils.murmur2(keyBytes)) % numPartitions; - } - - private static int toPositive(int number) { - return number & 0x7fffffff; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java new file mode 100644 index 0000000..ff1fa2c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StreamsPartitioner; + +public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> { + + private final WindowedSerializer<K> serializer; + + public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) { + this.serializer = serializer; + } + + /** + * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value + * and the current number of partitions. The partition number id determined by the original key of the windowed key + * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. + * + * @param windowedKey the key of the message + * @param value the value of the message + * @param numPartitions the total number of partitions + * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used + */ + public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { + byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + + // hash the keyBytes to choose a partition + return toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + + private static int toPositive(int number) { + return number & 0x7fffffff; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index fa19ed7..41e2235 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.StreamsMetrics; import java.io.File; @@ -70,9 +70,9 @@ public interface ProcessorContext { /** * Returns Metrics instance * - * @return StreamingMetrics + * @return StreamsMetrics */ - StreamingMetrics metrics(); + StreamsMetrics metrics(); /** * Registers and possibly restores the specified storage engine.
