http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 578357a..e5d0922 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -39,8 +39,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.PartitionGrouper; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -67,14 +67,14 @@ import java.util.concurrent.atomic.AtomicInteger; public class StreamThread extends Thread { private static final Logger log = LoggerFactory.getLogger(StreamThread.class); - private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1); + private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1); public final PartitionGrouper partitionGrouper; public final String jobId; public final String clientId; public final UUID processId; - protected final StreamingConfig config; + protected final StreamsConfig config; protected final TopologyBuilder builder; protected final Set<String> sourceTopics; protected final Producer<byte[], byte[]> producer; @@ -93,9 +93,9 @@ public class StreamThread extends Thread { private final long cleanTimeMs; private final long commitTimeMs; private final long totalRecordsToProcess; - private final StreamingMetricsImpl sensors; + private final StreamsMetricsImpl sensors; - private KafkaStreamingPartitionAssignor partitionAssignor = null; + private StreamPartitionAssignor partitionAssignor = null; private long lastClean; private long lastCommit; @@ -122,17 +122,17 @@ public class StreamThread extends Thread { }; public StreamThread(TopologyBuilder builder, - StreamingConfig config, + StreamsConfig config, String jobId, String clientId, UUID processId, Metrics metrics, - Time time) throws Exception { + Time time) { this(builder, config, null , null, null, jobId, clientId, processId, metrics, time); } StreamThread(TopologyBuilder builder, - StreamingConfig config, + StreamsConfig config, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> consumer, Consumer<byte[], byte[]> restoreConsumer, @@ -140,8 +140,8 @@ public class StreamThread extends Thread { String clientId, UUID processId, Metrics metrics, - Time time) throws Exception { - super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement()); + Time time) { + super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement()); this.jobId = jobId; this.config = config; @@ -149,7 +149,7 @@ public class StreamThread extends Thread { this.sourceTopics = builder.sourceTopics(); this.clientId = clientId; this.processId = processId; - this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); + this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class); // set the producer and consumer clients this.producer = (producer != null) ? producer : createProducer(); @@ -167,24 +167,24 @@ public class StreamThread extends Thread { this.standbyRecords = new HashMap<>(); // read in task specific config values - this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG)); + this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); this.stateDir.mkdir(); - this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG); - this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG); - this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG); - this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS); + this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); + this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); + this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); + this.totalRecordsToProcess = config.getLong(StreamsConfig.TOTAL_RECORDS_TO_PROCESS); this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment this.lastCommit = time.milliseconds(); this.recordsProcessed = 0; this.time = time; - this.sensors = new StreamingMetricsImpl(metrics); + this.sensors = new StreamsMetricsImpl(metrics); this.running = new AtomicBoolean(true); } - public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) { + public void partitionAssignor(StreamPartitionAssignor partitionAssignor) { this.partitionAssignor = partitionAssignor; } @@ -227,7 +227,7 @@ public class StreamThread extends Thread { } /** - * Shutdown this streaming thread. + * Shutdown this stream thread. */ public void close() { running.set(false); @@ -673,7 +673,7 @@ public class StreamThread extends Thread { } } - private class StreamingMetricsImpl implements StreamingMetrics { + private class StreamsMetricsImpl implements StreamsMetrics { final Metrics metrics; final String metricGrpName; final Map<String, String> metricTags; @@ -685,10 +685,10 @@ public class StreamThread extends Thread { final Sensor taskCreationSensor; final Sensor taskDestructionSensor; - public StreamingMetricsImpl(Metrics metrics) { + public StreamsMetricsImpl(Metrics metrics) { this.metrics = metrics; - this.metricGrpName = "streaming-metrics"; + this.metricGrpName = "stream-metrics"; this.metricTags = new LinkedHashMap<>(); this.metricTags.put("client-id", clientId + "-" + getName()); @@ -734,7 +734,7 @@ public class StreamThread extends Thread { for (int i = 0; i < tags.length; i += 2) tagMap.put(tags[i], tags[i + 1]); - String metricGroupName = "streaming-" + scopeName + "-metrics"; + String metricGroupName = "stream-" + scopeName + "-metrics"; // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(scopeName + "-" + operationName);
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/Entry.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java deleted file mode 100644 index 183b691..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.state; - -public class Entry<K, V> { - - private final K key; - private final V value; - - public Entry(K key, V value) { - this.key = key; - this.value = value; - } - - public K key() { - return key; - } - - public V value() { - return value; - } - - public String toString() { - return "Entry(" + key() + ", " + value() + ")"; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index 0fbd4ae..bd118a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -19,10 +19,12 @@ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.KeyValue; + import java.io.Closeable; import java.util.Iterator; -public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable { +public interface KeyValueIterator<K, V> extends Iterator<KeyValue<K, V>>, Closeable { @Override public void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index e4faed1..d448044 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.StateStore; import java.util.List; @@ -55,7 +56,7 @@ public interface KeyValueStore<K, V> extends StateStore { * @param entries A list of entries to put into the store. * @throws NullPointerException If null is used for any key or value. */ - abstract public void putAll(List<Entry<K, V>> entries); + abstract public void putAll(List<KeyValue<K, V>> entries); /** * Delete the value from the store (if there is one) http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 55d1ac3..08cd049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.state; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index 286db1b..4856b09 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -18,10 +18,10 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -97,9 +97,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); + public void putAll(List<KeyValue<K, V>> entries) { + for (KeyValue<K, V> entry : entries) + put(entry.key, entry.value); } @Override @@ -140,9 +140,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier { } @Override - public Entry<K, V> next() { + public KeyValue<K, V> next() { Map.Entry<K, V> entry = iter.next(); - return new Entry<>(entry.getKey(), entry.getValue()); + return new KeyValue<>(entry.getKey(), entry.getValue()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java index 6a38423..22ee3f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java @@ -17,10 +17,10 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -131,9 +131,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { } @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); + public void putAll(List<KeyValue<K, V>> entries) { + for (KeyValue<K, V> entry : entries) + put(entry.key, entry.value); } @Override @@ -179,9 +179,9 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier { } @Override - public Entry<K, V> next() { + public KeyValue<K, V> next() { lastKey = keys.next(); - return new Entry<>(lastKey, entries.get(lastKey)); + return new KeyValue<>(lastKey, entries.get(lastKey)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 21f73b0..d5fe44a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -18,13 +18,13 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -47,7 +47,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { private Sensor rangeTime; private Sensor flushTime; private Sensor restoreTime; - private StreamingMetrics metrics; + private StreamsMetrics metrics; private boolean loggingEnabled = true; private StoreChangeLogger<K, V> changeLogger = null; @@ -141,14 +141,14 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override - public void putAll(List<Entry<K, V>> entries) { + public void putAll(List<KeyValue<K, V>> entries) { long startNs = time.nanoseconds(); try { this.inner.putAll(entries); if (loggingEnabled) { - for (Entry<K, V> entry : entries) { - K key = entry.key(); + for (KeyValue<K, V> entry : entries) { + K key = entry.key; changeLogger.add(key); } changeLogger.maybeLogChange(this.getter); @@ -231,7 +231,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { } @Override - public Entry<K1, V1> next() { + public KeyValue<K1, V1> next() { return iter.next(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java index 821927d..862c322 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java @@ -20,8 +20,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamingMetrics; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.state.Serdes; @@ -40,7 +40,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> { private Sensor rangeTime; private Sensor flushTime; private Sensor restoreTime; - private StreamingMetrics metrics; + private StreamsMetrics metrics; private boolean loggingEnabled = true; private StoreChangeLogger<byte[], byte[]> changeLogger = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 8a600f9..6c77ab2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -18,8 +18,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Serdes; @@ -147,9 +147,9 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } @Override - public void putAll(List<Entry<K, V>> entries) { - for (Entry<K, V> entry : entries) - put(entry.key(), entry.value()); + public void putAll(List<KeyValue<K, V>> entries) { + for (KeyValue<K, V> entry : entries) + put(entry.key, entry.value); } @Override @@ -200,8 +200,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { return iter.key(); } - protected Entry<K, V> getEntry() { - return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value())); + protected KeyValue<K, V> getKeyValue() { + return new KeyValue<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value())); } @Override @@ -210,11 +210,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> { } @Override - public Entry<K, V> next() { + public KeyValue<K, V> next() { if (!hasNext()) throw new NoSuchElementException(); - Entry<K, V> entry = this.getEntry(); + KeyValue<K, V> entry = this.getKeyValue(); iter.next(); return entry; } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 933ed91..d854c92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -20,9 +20,8 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.WindowStore; @@ -86,10 +85,10 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { if (index >= iterators.length) throw new NoSuchElementException(); - Entry<byte[], byte[]> entry = iterators[index].next(); + KeyValue<byte[], byte[]> kv = iterators[index].next(); - return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()), - serdes.valueFrom(entry.value())); + return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key), + serdes.valueFrom(kv.value)); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java deleted file mode 100644 index 3b3fc9b..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/StreamingConfigTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.examples.WallclockTimestampExtractor; -import org.apache.kafka.streams.processor.internals.StreamThread; -import org.junit.Before; -import org.junit.Test; - -import java.util.Map; -import java.util.Properties; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - - - -public class StreamingConfigTest { - - private Properties props = new Properties(); - private StreamingConfig streamingConfig; - private StreamThread streamThreadPlaceHolder; - - - @Before - public void setUp() { - props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - streamingConfig = new StreamingConfig(props); - } - - @Test - public void testGetProducerConfigs() throws Exception { - Map<String, Object> returnedProps = streamingConfig.getProducerConfigs("client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer"); - } - - @Test - public void testGetConsumerConfigs() throws Exception { - Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); - assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job"); - - } - - @Test - public void testGetRestoreConsumerConfigs() throws Exception { - Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs("client"); - assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); - assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java new file mode 100644 index 0000000..777fae5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.examples.WallclockTimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.junit.Before; +import org.junit.Test; + +import java.util.Map; +import java.util.Properties; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + + +public class StreamsConfigTest { + + private Properties props = new Properties(); + private StreamsConfig streamsConfig; + private StreamThread streamThreadPlaceHolder; + + + @Before + public void setUp() { + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); + streamsConfig = new StreamsConfig(props); + } + + @Test + public void testGetProducerConfigs() throws Exception { + Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-producer"); + } + + @Test + public void testGetConsumerConfigs() throws Exception { + Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer"); + assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job"); + + } + + @Test + public void testGetRestoreConsumerConfigs() throws Exception { + Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client"); + assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer"); + assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index a55fd30..693f58e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 880adce..f226cee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index 0f7cb6a..73c517b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index 0b7b1e7..426259f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index aa09e74..12bfb9c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 1527f17..e3cf22b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 67b83f5..feabc08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -24,7 +24,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java deleted file mode 100644 index 1b8cbb8..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.kstream.Windowed; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -public class WindowedStreamPartitionerTest { - - private String topicName = "topic"; - - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - - private List<PartitionInfo> infos = Arrays.asList( - new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) - ); - - private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); - - @Test - public void testCopartitioning() { - - Random rand = new Random(); - - DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); - - WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer); - WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); - - for (int k = 0; k < 10; k++) { - Integer key = rand.nextInt(); - byte[] keyBytes = keySerializer.serialize(topicName, key); - - String value = key.toString(); - byte[] valueBytes = valSerializer.serialize(topicName, value); - - Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); - - for (int w = 0; w < 10; w++) { - HoppingWindow window = new HoppingWindow(10 * w, 20 * w); - - Windowed<Integer> windowedKey = new Windowed<>(key, window); - Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); - - assertEquals(expected, actual); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java new file mode 100644 index 0000000..18494fd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class WindowedStreamsPartitionerTest { + + private String topicName = "topic"; + + private IntegerSerializer keySerializer = new IntegerSerializer(); + private StringSerializer valSerializer = new StringSerializer(); + + private List<PartitionInfo> infos = Arrays.asList( + new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); + + @Test + public void testCopartitioning() { + + Random rand = new Random(); + + DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); + + WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer); + WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer); + + for (int k = 0; k < 10; k++) { + Integer key = rand.nextInt(); + byte[] keyBytes = keySerializer.serialize(topicName, key); + + String value = key.toString(); + byte[] valueBytes = valSerializer.serialize(topicName, value); + + Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); + + for (int w = 0; w < 10; w++) { + HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + + Windowed<Integer> windowedKey = new Windowed<>(key, window); + Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); + + assertEquals(expected, actual); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java deleted file mode 100644 index 92d7b6a..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KafkaStreamingPartitionAssignorTest.java +++ /dev/null @@ -1,508 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; -import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.SystemTime; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; -import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; -import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.MockStateStoreSupplier; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.UUID; - -import static org.junit.Assert.assertEquals; - -public class KafkaStreamingPartitionAssignorTest { - - private TopicPartition t1p0 = new TopicPartition("topic1", 0); - private TopicPartition t1p1 = new TopicPartition("topic1", 1); - private TopicPartition t1p2 = new TopicPartition("topic1", 2); - private TopicPartition t2p0 = new TopicPartition("topic2", 0); - private TopicPartition t2p1 = new TopicPartition("topic2", 1); - private TopicPartition t2p2 = new TopicPartition("topic2", 2); - private TopicPartition t3p0 = new TopicPartition("topic3", 0); - private TopicPartition t3p1 = new TopicPartition("topic3", 1); - private TopicPartition t3p2 = new TopicPartition("topic3", 2); - private TopicPartition t3p3 = new TopicPartition("topic3", 3); - - private Set<String> allTopics = Utils.mkSet("topic1", "topic2"); - - private List<PartitionInfo> infos = Arrays.asList( - new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) - ); - - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); - - private final TaskId task0 = new TaskId(0, 0); - private final TaskId task1 = new TaskId(0, 1); - private final TaskId task2 = new TaskId(0, 2); - private final TaskId task3 = new TaskId(0, 3); - - private Properties configProps() { - return new Properties() { - { - setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - } - }; - } - - private ByteArraySerializer serializer = new ByteArraySerializer(); - - @SuppressWarnings("unchecked") - @Test - public void testSubscription() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - - final Set<TaskId> prevTasks = Utils.mkSet( - new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); - final Set<TaskId> cachedTasks = Utils.mkSet( - new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), - new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); - - String clientId = "client-id"; - UUID processId = UUID.randomUUID(); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { - @Override - public Set<TaskId> prevTasks() { - return prevTasks; - } - @Override - public Set<TaskId> cachedTasks() { - return cachedTasks; - } - }; - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId)); - - PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); - - Collections.sort(subscription.topics()); - assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics()); - - Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); - standbyTasks.removeAll(prevTasks); - - SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks); - assertEquals(info.encode(), subscription.userData()); - } - - @Test - public void testAssignBasic() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - List<String> topics = Utils.mkList("topic1", "topic2"); - Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); - - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> prevTasks11 = Utils.mkSet(task1); - final Set<TaskId> prevTasks20 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); - final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); - - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); - String client1 = "client1"; - String client2 = "client2"; - - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); - - Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); - subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); - subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); - subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); - - Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); - - // check assigned partitions - assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), - Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); - assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); - - // check assignment info - - Set<TaskId> allActiveTasks = new HashSet<>(); - - // the first consumer - AssignmentInfo info10 = checkAssignment(assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); - - // the second consumer - AssignmentInfo info11 = checkAssignment(assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); - - assertEquals(Utils.mkSet(task0, task1), allActiveTasks); - - // the third consumer - AssignmentInfo info20 = checkAssignment(assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); - - assertEquals(3, allActiveTasks.size()); - assertEquals(allTasks, new HashSet<>(allActiveTasks)); - - assertEquals(3, allActiveTasks.size()); - assertEquals(allTasks, allActiveTasks); - } - - @Test - public void testAssignWithNewTasks() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addSource("source3", "topic3"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); - List<String> topics = Utils.mkList("topic1", "topic2", "topic3"); - Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); - - // assuming that previous tasks do not have topic3 - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> prevTasks11 = Utils.mkSet(task1); - final Set<TaskId> prevTasks20 = Utils.mkSet(task2); - - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); - String client1 = "client1"; - String client2 = "client2"; - - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); - - Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); - subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode())); - subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode())); - subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode())); - - Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); - - // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match - // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and - // then later ones will be re-assigned to other hosts due to load balancing - Set<TaskId> allActiveTasks = new HashSet<>(); - Set<TopicPartition> allPartitions = new HashSet<>(); - AssignmentInfo info; - - info = AssignmentInfo.decode(assignments.get("consumer10").userData()); - allActiveTasks.addAll(info.activeTasks); - allPartitions.addAll(assignments.get("consumer10").partitions()); - - info = AssignmentInfo.decode(assignments.get("consumer11").userData()); - allActiveTasks.addAll(info.activeTasks); - allPartitions.addAll(assignments.get("consumer11").partitions()); - - info = AssignmentInfo.decode(assignments.get("consumer20").userData()); - allActiveTasks.addAll(info.activeTasks); - allPartitions.addAll(assignments.get("consumer20").partitions()); - - assertEquals(allTasks, allActiveTasks); - assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); - } - - @Test - public void testAssignWithStates() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - - builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); - builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); - - builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); - builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2"); - builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2"); - - List<String> topics = Utils.mkList("topic1", "topic2"); - - TaskId task00 = new TaskId(0, 0); - TaskId task01 = new TaskId(0, 1); - TaskId task02 = new TaskId(0, 2); - TaskId task10 = new TaskId(1, 0); - TaskId task11 = new TaskId(1, 1); - TaskId task12 = new TaskId(1, 2); - - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); - String client1 = "client1"; - String client2 = "client2"; - - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); - - Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); - subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); - subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); - subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); - - Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); - - // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match - assertEquals(2, assignments.get("consumer10").partitions().size()); - assertEquals(2, assignments.get("consumer11").partitions().size()); - assertEquals(2, assignments.get("consumer20").partitions().size()); - - assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size()); - assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size()); - assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size()); - - // check tasks for state topics - assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1")); - assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2")); - assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3")); - } - - @Test - public void testAssignWithStandbyReplicas() throws Exception { - Properties props = configProps(); - props.setProperty(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); - StreamingConfig config = new StreamingConfig(props); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - List<String> topics = Utils.mkList("topic1", "topic2"); - Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); - - - final Set<TaskId> prevTasks10 = Utils.mkSet(task0); - final Set<TaskId> prevTasks11 = Utils.mkSet(task1); - final Set<TaskId> prevTasks20 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); - final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); - final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); - - UUID uuid1 = UUID.randomUUID(); - UUID uuid2 = UUID.randomUUID(); - String client1 = "client1"; - String client2 = "client2"; - - StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); - - Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); - subscriptions.put("consumer10", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); - subscriptions.put("consumer11", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); - subscriptions.put("consumer20", - new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); - - Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); - - Set<TaskId> allActiveTasks = new HashSet<>(); - Set<TaskId> allStandbyTasks = new HashSet<>(); - - // the first consumer - AssignmentInfo info10 = checkAssignment(assignments.get("consumer10")); - allActiveTasks.addAll(info10.activeTasks); - allStandbyTasks.addAll(info10.standbyTasks.keySet()); - - // the second consumer - AssignmentInfo info11 = checkAssignment(assignments.get("consumer11")); - allActiveTasks.addAll(info11.activeTasks); - allStandbyTasks.addAll(info11.standbyTasks.keySet()); - - // check active tasks assigned to the first client - assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); - assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); - - // the third consumer - AssignmentInfo info20 = checkAssignment(assignments.get("consumer20")); - allActiveTasks.addAll(info20.activeTasks); - allStandbyTasks.addAll(info20.standbyTasks.keySet()); - - // all task ids are in the active tasks and also in the standby tasks - - assertEquals(3, allActiveTasks.size()); - assertEquals(allTasks, allActiveTasks); - - assertEquals(3, allStandbyTasks.size()); - assertEquals(allTasks, allStandbyTasks); - } - - private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) { - - // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group. - - AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); - - // check if the number of assigned partitions == the size of active task id list - assertEquals(assignment.partitions().size(), info.activeTasks.size()); - - // check if active tasks are consistent - List<TaskId> activeTasks = new ArrayList<>(); - Set<String> activeTopics = new HashSet<>(); - for (TopicPartition partition : assignment.partitions()) { - // since default grouper, taskid.partition == partition.partition() - activeTasks.add(new TaskId(0, partition.partition())); - activeTopics.add(partition.topic()); - } - assertEquals(activeTasks, info.activeTasks); - - // check if active partitions cover all topics - assertEquals(allTopics, activeTopics); - - // check if standby tasks are consistent - Set<String> standbyTopics = new HashSet<>(); - for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) { - TaskId id = entry.getKey(); - Set<TopicPartition> partitions = entry.getValue(); - for (TopicPartition partition : partitions) { - // since default grouper, taskid.partition == partition.partition() - assertEquals(id.partition, partition.partition()); - - standbyTopics.add(partition.topic()); - } - } - - if (info.standbyTasks.size() > 0) - // check if standby partitions cover all topics - assertEquals(allTopics, standbyTopics); - - return info; - } - - @Test - public void testOnAssignment() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); - - MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); - MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); - MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - - TopicPartition t2p3 = new TopicPartition("topic2", 3); - - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); - - UUID uuid = UUID.randomUUID(); - String client1 = "client1"; - - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime()); - - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); - - List<TaskId> activeTaskList = Utils.mkList(task0, task3); - Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0))); - standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0))); - - AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks); - PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode()); - partitionAssignor.onAssignment(assignment); - - assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0)); - assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3)); - assertEquals(standbyTasks, partitionAssignor.standbyTasks()); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index f2ef2ea..60bd309 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -27,12 +27,12 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.StreamsPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; @@ -60,21 +60,22 @@ public class ProcessorTopologyTest { private static long timestamp = 1000L; private ProcessorTopologyTestDriver driver; - private StreamingConfig config; + private StreamsConfig config; @Before public void setup() { // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ... File localState = StateUtils.tempDir(); Properties props = new Properties(); - props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); - props.setProperty(StreamingConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); - props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); - props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - this.config = new StreamingConfig(props); + props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test"); + props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); + props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); + props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + this.config = new StreamsConfig(props); } @After @@ -193,8 +194,8 @@ public class ProcessorTopologyTest { assertNull(driver.readOutput(topic)); } - protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) { - return new StreamPartitioner<K, V>() { + protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) { + return new StreamsPartitioner<K, V>() { @Override public Integer partition(K key, V value, int numPartitions) { return partition; http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 85a8a15..fd604b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -85,17 +85,18 @@ public class StandbyTaskTest { ) ); - private StreamingConfig createConfig(final File baseDir) throws Exception { - return new StreamingConfig(new Properties() { + private StreamsConfig createConfig(final File baseDir) throws Exception { + return new StreamsConfig(new Properties() { { - setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test"); + setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); } }); } @@ -130,7 +131,7 @@ public class StandbyTaskTest { public void testStorePartitions() throws Exception { File baseDir = Files.createTempDirectory("test").toFile(); try { - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); @@ -145,7 +146,7 @@ public class StandbyTaskTest { public void testUpdateNonPersistentStore() throws Exception { File baseDir = Files.createTempDirectory("test").toFile(); try { - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -164,7 +165,7 @@ public class StandbyTaskTest { public void testUpdate() throws Exception { File baseDir = Files.createTempDirectory("test").toFile(); try { - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -227,7 +228,7 @@ public class StandbyTaskTest { new PartitionInfo("ktable1", 2, Node.noNode(), new Node[0], new Node[0]) )); - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
