http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java new file mode 100644 index 0000000..e9aaa20 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Time; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * An in-memory key-value store based on a TreeMap + * + * @param <K> The key type + * @param <V> The value type + */ +public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> { + + public InMemoryKeyValueStore(String name, ProcessorContext context) { + this(name, context, new SystemTime()); + } + + public InMemoryKeyValueStore(String name, ProcessorContext context, Time time) { + super(name, new MemoryStore<K, V>(name, context), context, "kafka-streams", time); + } + + private static class MemoryStore<K, V> implements KeyValueStore<K, V> { + + private final String name; + private final NavigableMap<K, V> map; + private final ProcessorContext context; + + @SuppressWarnings("unchecked") + public MemoryStore(String name, ProcessorContext context) { + super(); + this.name = name; + this.map = new TreeMap<>(); + this.context = context; + } + + @Override + public String name() { + return this.name; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public V get(K key) { + return this.map.get(key); + } + + @Override + public void put(K key, V value) { + this.map.put(key, value); + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + for (Entry<K, V> entry : entries) + put(entry.key(), entry.value()); + } + + @Override + public V delete(K key) { + return this.map.remove(key); + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator()); + } + + @Override + public KeyValueIterator<K, V> all() { + return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator()); + } + + @Override + public void flush() { + // do-nothing since it is in-memory + } + + public void restore() { + // this should not happen since it is in-memory, hence no state to load from disk + throw new IllegalStateException("This should not happen"); + } + + @Override + public void close() { + // do-nothing + } + + private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> { + private final Iterator<Map.Entry<K, V>> iter; + + public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) { + this.iter = iter; + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Entry<K, V> next() { + Map.Entry<K, V> entry = iter.next(); + return new Entry<>(entry.getKey(), entry.getValue()); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + } + + } + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java new file mode 100644 index 0000000..0fbd4ae --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state; + +import java.io.Closeable; +import java.util.Iterator; + +public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable { + + @Override + public void close(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java new file mode 100644 index 0000000..e4faed1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.processor.StateStore; + +import java.util.List; + +/** + * A key-value store that supports put/get/delete and range queries. + * + * @param <K> The key type + * @param <V> The value type + */ +public interface KeyValueStore<K, V> extends StateStore { + + /** + * Get the value corresponding to this key + * + * @param key The key to fetch + * @return The value or null if no value is found. + * @throws NullPointerException If null is used for key. + */ + abstract public V get(K key); + + /** + * Update the value associated with this key + * + * @param key They key to associate the value to + * @param value The value + * @throws NullPointerException If null is used for key or value. + */ + abstract public void put(K key, V value); + + /** + * Update all the given key/value pairs + * + * @param entries A list of entries to put into the store. + * @throws NullPointerException If null is used for any key or value. + */ + abstract public void putAll(List<Entry<K, V>> entries); + + /** + * Delete the value from the store (if there is one) + * + * @param key The key + * @return The old value or null if there is no such key. + * @throws NullPointerException If null is used for key. + */ + abstract public V delete(K key); + + /** + * Get an iterator over a given range of keys. This iterator MUST be closed after use. + * + * @param from The first key that could be in the range + * @param to The last key that could be in the range + * @return The iterator for this range. + * @throws NullPointerException If null is used for from or to. + */ + abstract public KeyValueIterator<K, V> range(K from, K to); + + /** + * Return an iterator over all keys in the database. This iterator MUST be closed after use. + * + * @return An iterator of all key/value pairs in the store. + */ + abstract public KeyValueIterator<K, V> all(); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java new file mode 100644 index 0000000..018f1c6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.RestoreFunc; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MeasurableStat; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Count; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.RecordCollector; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> { + + protected final KeyValueStore<K, V> inner; + + private final Time time; + private final String group; + private final Sensor putTime; + private final Sensor getTime; + private final Sensor deleteTime; + private final Sensor putAllTime; + private final Sensor allTime; + private final Sensor rangeTime; + private final Sensor flushTime; + private final Sensor restoreTime; + private final Metrics metrics; + + private final String topic; + private final int partition; + private final Set<K> dirty; + private final int maxDirty; + private final ProcessorContext context; + + // always wrap the logged store with the metered store + public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String group, Time time) { + this.inner = inner; + + this.time = time; + this.group = group; + this.metrics = context.metrics(); + this.putTime = createSensor(name, "put"); + this.getTime = createSensor(name, "get"); + this.deleteTime = createSensor(name, "delete"); + this.putAllTime = createSensor(name, "put-all"); + this.allTime = createSensor(name, "all"); + this.rangeTime = createSensor(name, "range"); + this.flushTime = createSensor(name, "flush"); + this.restoreTime = createSensor(name, "restore"); + + this.topic = name; + this.partition = context.id(); + + this.context = context; + + this.dirty = new HashSet<K>(); + this.maxDirty = 100; // TODO: this needs to be configurable + + // register and possibly restore the state from the logs + long startNs = time.nanoseconds(); + try { + final Deserializer<K> keyDeserializer = (Deserializer<K>) context.keyDeserializer(); + final Deserializer<V> valDeserializer = (Deserializer<V>) context.valueDeserializer(); + + context.register(this, new RestoreFunc() { + @Override + public void apply(byte[] key, byte[] value) { + inner.put(keyDeserializer.deserialize(topic, key), + valDeserializer.deserialize(topic, value)); + } + }); + } finally { + recordLatency(this.restoreTime, startNs, time.nanoseconds()); + } + } + + private Sensor createSensor(String storeName, String operation) { + Sensor parent = metrics.sensor(operation); + addLatencyMetrics(parent, operation); + Sensor sensor = metrics.sensor(storeName + "- " + operation, parent); + addLatencyMetrics(sensor, operation, "store-name", storeName); + return sensor; + } + + private void addLatencyMetrics(Sensor sensor, String opName, String... kvs) { + maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", group, "The average latency in milliseconds of the key-value store operation.", kvs), new Avg()); + maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", group, "The max latency in milliseconds of the key-value store operation.", kvs), new Max()); + maybeAddMetric(sensor, new MetricName(opName + "-qps", group, "The average number of occurance of the given key-value store operation per second.", kvs), new Rate(new Count())); + } + + private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { + if (!metrics.metrics().containsKey(name)) + sensor.add(name, stat); + } + + @Override + public String name() { + return inner.name(); + } + + @Override + public boolean persistent() { + return inner.persistent(); + } + + @Override + public V get(K key) { + long startNs = time.nanoseconds(); + try { + return this.inner.get(key); + } finally { + recordLatency(this.getTime, startNs, time.nanoseconds()); + } + } + + @Override + public void put(K key, V value) { + long startNs = time.nanoseconds(); + try { + this.inner.put(key, value); + + this.dirty.add(key); + if (this.dirty.size() > this.maxDirty) + logChange(); + } finally { + recordLatency(this.putTime, startNs, time.nanoseconds()); + } + } + + @Override + public void putAll(List<Entry<K, V>> entries) { + long startNs = time.nanoseconds(); + try { + this.inner.putAll(entries); + + for (Entry<K, V> entry : entries) { + this.dirty.add(entry.key()); + } + + if (this.dirty.size() > this.maxDirty) + logChange(); + } finally { + recordLatency(this.putAllTime, startNs, time.nanoseconds()); + } + } + + @Override + public V delete(K key) { + long startNs = time.nanoseconds(); + try { + V value = this.inner.delete(key); + + this.dirty.add(key); + if (this.dirty.size() > this.maxDirty) + logChange(); + + return value; + } finally { + recordLatency(this.deleteTime, startNs, time.nanoseconds()); + } + } + + @Override + public KeyValueIterator<K, V> range(K from, K to) { + return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime); + } + + @Override + public KeyValueIterator<K, V> all() { + return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public void flush() { + long startNs = time.nanoseconds(); + try { + this.inner.flush(); + logChange(); + } finally { + recordLatency(this.flushTime, startNs, time.nanoseconds()); + } + } + + private void logChange() { + RecordCollector collector = ((ProcessorContextImpl) context).recordCollector(); + Serializer<K> keySerializer = (Serializer<K>) context.keySerializer(); + Serializer<V> valueSerializer = (Serializer<V>) context.valueSerializer(); + + if (collector != null) { + for (K k : this.dirty) { + V v = this.inner.get(k); + collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer); + } + this.dirty.clear(); + } + } + + private void recordLatency(Sensor sensor, long startNs, long endNs) { + sensor.record((endNs - startNs) / 1000000, endNs); + } + + private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> { + + private final KeyValueIterator<K1, V1> iter; + private final Sensor sensor; + private final long startNs; + + public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) { + this.iter = iter; + this.sensor = sensor; + this.startNs = time.nanoseconds(); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public Entry<K1, V1> next() { + return iter.next(); + } + + @Override + public void remove() { + iter.remove(); + } + + @Override + public void close() { + try { + iter.close(); + } finally { + recordLatency(this.sensor, this.startNs, time.nanoseconds()); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java new file mode 100644 index 0000000..e04de68 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.common.TopicPartition; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following: + * <pre> + * <version> + * <n> + * <topic_name_1> <partition_1> <offset_1> + * . + * . + * . + * <topic_name_n> <partition_n> <offset_n> + * </pre> + * The first line contains a number designating the format version (currently 0), the get line contains + * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple + * separated by spaces. + */ +public class OffsetCheckpoint { + + private static final int VERSION = 0; + + private final File file; + private final Object lock; + + public OffsetCheckpoint(File file) throws IOException { + new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness + this.file = file; + this.lock = new Object(); + } + + public void write(Map<TopicPartition, Long> offsets) throws IOException { + synchronized (lock) { + // write to temp file and then swap with the existing file + File temp = new File(file.getAbsolutePath() + ".tmp"); + + FileOutputStream fileOutputStream = new FileOutputStream(temp); + BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream)); + try { + writeIntLine(writer, VERSION); + writeIntLine(writer, offsets.size()); + + // write the entries + for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) + writeEntry(writer, entry.getKey(), entry.getValue()); + + // flush the buffer and then fsync the underlying file + writer.flush(); + fileOutputStream.getFD().sync(); + } finally { + writer.close(); + } + + // swap new offset checkpoint file with previous one + if (!temp.renameTo(file)) { + // renameTo() fails on Windows if the destination file exists. + file.delete(); + if (!temp.renameTo(file)) + throw new IOException(String.format("File rename from %s to %s failed.", + temp.getAbsolutePath(), + file.getAbsolutePath())); + } + } + } + + private void writeIntLine(BufferedWriter writer, int number) throws IOException { + writer.write(Integer.toString(number)); + writer.newLine(); + } + + private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException { + writer.write(part.topic()); + writer.write(' '); + writer.write(Integer.toString(part.partition())); + writer.write(' '); + writer.write(Long.toString(offset)); + writer.newLine(); + } + + public Map<TopicPartition, Long> read() throws IOException { + synchronized (lock) { + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(file)); + } catch (FileNotFoundException e) { + return Collections.emptyMap(); + } + + try { + int version = readInt(reader); + switch (version) { + case 0: + int expectedSize = readInt(reader); + Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(); + String line = reader.readLine(); + while (line != null) { + String[] pieces = line.split("\\s+"); + if (pieces.length != 3) + throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.", + line)); + + String topic = pieces[0]; + int partition = Integer.parseInt(pieces[1]); + long offset = Long.parseLong(pieces[2]); + offsets.put(new TopicPartition(topic, partition), offset); + line = reader.readLine(); + } + if (offsets.size() != expectedSize) + throw new IOException(String.format("Expected %d entries but found only %d", + expectedSize, + offsets.size())); + return offsets; + + default: + throw new IllegalArgumentException("Unknown offset checkpoint version: " + version); + } + } finally { + if (reader != null) + reader.close(); + } + } + } + + private int readInt(BufferedReader reader) throws IOException { + String line = reader.readLine(); + if (line == null) + throw new EOFException("File ended prematurely."); + int val = Integer.parseInt(line); + return val; + } + + public void delete() throws IOException { + file.delete(); + } + + @Override + public String toString() { + return this.file.getAbsolutePath(); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java new file mode 100644 index 0000000..e0962a2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.SystemTime; + +import org.apache.kafka.common.utils.Time; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.util.Comparator; +import java.util.List; +import java.util.NoSuchElementException; + +public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> { + + public RocksDBKeyValueStore(String name, ProcessorContext context) { + this(name, context, new SystemTime()); + } + + public RocksDBKeyValueStore(String name, ProcessorContext context, Time time) { + super(name, new RocksDBStore(name, context), context, "kafka-streams", time); + } + + private static class RocksDBStore implements KeyValueStore<byte[], byte[]> { + + private static final int TTL_NOT_USED = -1; + + // TODO: these values should be configurable + private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L; + private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L; + private static final long BLOCK_SIZE = 4096L; + private static final int TTL_SECONDS = TTL_NOT_USED; + private static final int MAX_WRITE_BUFFERS = 3; + private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION; + private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL; + private static final String DB_FILE_DIR = "rocksdb"; + + private final String topic; + private final int partition; + private final ProcessorContext context; + + private final Options options; + private final WriteOptions wOptions; + private final FlushOptions fOptions; + + private final String dbName; + private final String dirName; + + private RocksDB db; + + @SuppressWarnings("unchecked") + public RocksDBStore(String name, ProcessorContext context) { + this.topic = name; + this.partition = context.id(); + this.context = context; + + // initialize the rocksdb options + BlockBasedTableConfig tableConfig = new BlockBasedTableConfig(); + tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE); + tableConfig.setBlockSize(BLOCK_SIZE); + + options = new Options(); + options.setTableFormatConfig(tableConfig); + options.setWriteBufferSize(WRITE_BUFFER_SIZE); + options.setCompressionType(COMPRESSION_TYPE); + options.setCompactionStyle(COMPACTION_STYLE); + options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS); + options.setCreateIfMissing(true); + options.setErrorIfExists(false); + + wOptions = new WriteOptions(); + wOptions.setDisableWAL(true); + + fOptions = new FlushOptions(); + fOptions.setWaitForFlush(true); + + dbName = this.topic + "." + this.partition; + dirName = this.context.stateDir() + File.separator + DB_FILE_DIR; + + db = openDB(new File(dirName, dbName), this.options, TTL_SECONDS); + } + + private RocksDB openDB(File dir, Options options, int ttl) { + try { + if (ttl == TTL_NOT_USED) { + return RocksDB.open(options, dir.toString()); + } else { + throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based."); + // TODO: support TTL with change log? + // return TtlDB.open(options, dir.toString(), ttl, false); + } + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e); + } + } + + @Override + public String name() { + return this.topic; + } + + @Override + public boolean persistent() { + return false; + } + + @Override + public byte[] get(byte[] key) { + try { + return this.db.get(key); + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e); + } + } + + @Override + public void put(byte[] key, byte[] value) { + try { + if (value == null) { + db.remove(wOptions, key); + } else { + db.put(wOptions, key, value); + } + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e); + } + } + + @Override + public void putAll(List<Entry<byte[], byte[]>> entries) { + for (Entry<byte[], byte[]> entry : entries) + put(entry.key(), entry.value()); + } + + @Override + public byte[] delete(byte[] key) { + byte[] value = get(key); + put(key, null); + return value; + } + + @Override + public KeyValueIterator<byte[], byte[]> range(byte[] from, byte[] to) { + return new RocksDBRangeIterator(db.newIterator(), from, to); + } + + @Override + public KeyValueIterator<byte[], byte[]> all() { + RocksIterator innerIter = db.newIterator(); + innerIter.seekToFirst(); + return new RocksDbIterator(innerIter); + } + + @Override + public void flush() { + try { + db.flush(fOptions); + } catch (RocksDBException e) { + // TODO: this needs to be handled more accurately + throw new KafkaException("Error while executing flush from store " + this.topic, e); + } + } + + @Override + public void close() { + flush(); + db.close(); + } + + private static class RocksDbIterator implements KeyValueIterator<byte[], byte[]> { + private final RocksIterator iter; + + public RocksDbIterator(RocksIterator iter) { + this.iter = iter; + } + + protected byte[] peekKey() { + return this.getEntry().key(); + } + + protected Entry<byte[], byte[]> getEntry() { + return new Entry<>(iter.key(), iter.value()); + } + + @Override + public boolean hasNext() { + return iter.isValid(); + } + + @Override + public Entry<byte[], byte[]> next() { + if (!hasNext()) + throw new NoSuchElementException(); + + Entry<byte[], byte[]> entry = this.getEntry(); + iter.next(); + + return entry; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("RocksDB iterator does not support remove"); + } + + @Override + public void close() { + } + + } + + private static class LexicographicComparator implements Comparator<byte[]> { + + @Override + public int compare(byte[] left, byte[] right) { + for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { + int leftByte = left[i] & 0xff; + int rightByte = right[j] & 0xff; + if (leftByte != rightByte) { + return leftByte - rightByte; + } + } + return left.length - right.length; + } + } + + private static class RocksDBRangeIterator extends RocksDbIterator { + // RocksDB's JNI interface does not expose getters/setters that allow the + // comparator to be pluggable, and the default is lexicographic, so it's + // safe to just force lexicographic comparator here for now. + private final Comparator<byte[]> comparator = new LexicographicComparator(); + byte[] to; + + public RocksDBRangeIterator(RocksIterator iter, byte[] from, byte[] to) { + super(iter); + iter.seek(from); + this.to = to; + } + + @Override + public boolean hasNext() { + return super.hasNext() && comparator.compare(super.peekKey(), this.to) < 0; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java new file mode 100644 index 0000000..49171e3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.KStreamImpl; +import org.apache.kafka.streams.processor.TopologyException; +import org.junit.Test; + +public class KStreamBuilderTest { + + @Test(expected = TopologyException.class) + public void testFrom() { + final KStreamBuilder builder = new KStreamBuilder(); + + builder.from("topic-1", "topic-2"); + + builder.addSource(KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.decrementAndGet(), "topic-3"); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java new file mode 100644 index 0000000..405c7c9 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FilteredIteratorTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +public class FilteredIteratorTest { + + @Test + public void testFiltering() { + List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5); + + Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) { + protected String filter(Integer i) { + if (i % 3 == 0) return i.toString(); + return null; + } + }; + + List<String> expected = Arrays.asList("3", "9", "6", "3"); + List<String> result = new ArrayList<String>(); + + while (filtered.hasNext()) { + result.add(filtered.next()); + } + + assertEquals(expected, result); + } + + @Test + public void testEmptySource() { + List<Integer> list = new ArrayList<Integer>(); + + Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) { + protected String filter(Integer i) { + if (i % 3 == 0) return i.toString(); + return null; + } + }; + + List<String> expected = new ArrayList<String>(); + List<String> result = new ArrayList<String>(); + + while (filtered.hasNext()) { + result.add(filtered.next()); + } + + assertEquals(expected, result); + } + + @Test + public void testNoMatch() { + List<Integer> list = Arrays.asList(3, 1, 4, 1, 5, 9, 2, 6, 5, 3, 5); + + Iterator<String> filtered = new FilteredIterator<String, Integer>(list.iterator()) { + protected String filter(Integer i) { + if (i % 7 == 0) return i.toString(); + return null; + } + }; + + List<String> expected = new ArrayList<String>(); + List<String> result = new ArrayList<String>(); + + while (filtered.hasNext()) { + result.add(filtered.next()); + } + + assertEquals(expected, result); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java new file mode 100644 index 0000000..c18ddfe --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import java.lang.reflect.Array; + +import static org.junit.Assert.assertEquals; + +public class KStreamBranchTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @SuppressWarnings("unchecked") + @Test + public void testKStreamBranch() { + KStreamBuilder builder = new KStreamBuilder(); + + Predicate<Integer, String> isEven = new Predicate<Integer, String>() { + @Override + public boolean apply(Integer key, String value) { + return (key % 2) == 0; + } + }; + Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() { + @Override + public boolean apply(Integer key, String value) { + return (key % 3) == 0; + } + }; + Predicate<Integer, String> isOdd = new Predicate<Integer, String>() { + @Override + public boolean apply(Integer key, String value) { + return (key % 2) != 0; + } + }; + + final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6}; + + KStream<Integer, String> stream; + KStream<Integer, String>[] branches; + MockProcessorDef<Integer, String>[] processors; + + stream = builder.from(keyDeserializer, valDeserializer, topicName); + branches = stream.branch(isEven, isMultipleOfThree, isOdd); + + assertEquals(3, branches.length); + + processors = (MockProcessorDef<Integer, String>[]) Array.newInstance(MockProcessorDef.class, branches.length); + for (int i = 0; i < branches.length; i++) { + processors[i] = new MockProcessorDef<>(); + branches[i].process(processors[i]); + } + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(3, processors[0].processed.size()); + assertEquals(1, processors[1].processed.size()); + assertEquals(2, processors[2].processed.size()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java new file mode 100644 index 0000000..b80e1e2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamFilterTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() { + @Override + public boolean apply(Integer key, String value) { + return (key % 3) == 0; + } + }; + + @Test + public void testFilter() { + KStreamBuilder builder = new KStreamBuilder(); + final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; + + KStream<Integer, String> stream; + MockProcessorDef<Integer, String> processor; + + processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.filter(isMultipleOfThree).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(2, processor.processed.size()); + } + + @Test + public void testFilterOut() { + KStreamBuilder builder = new KStreamBuilder(); + final int[] expectedKeys = new int[]{1, 2, 3, 4, 5, 6, 7}; + + KStream<Integer, String> stream; + MockProcessorDef<Integer, String> processor; + + processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.filterOut(isMultipleOfThree).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(5, processor.processed.size()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java new file mode 100644 index 0000000..e87223e --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; + +public class KStreamFlatMapTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @Test + public void testFlatMap() { + KStreamBuilder builder = new KStreamBuilder(); + + KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>> mapper = + new KeyValueMapper<Integer, String, Iterable<KeyValue<String, String>>>() { + @Override + public Iterable<KeyValue<String, String>> apply(Integer key, String value) { + ArrayList<KeyValue<String, String>> result = new ArrayList<>(); + for (int i = 0; i < key; i++) { + result.add(KeyValue.pair(Integer.toString(key * 10 + i), value)); + } + return result; + } + }; + + final int[] expectedKeys = {0, 1, 2, 3}; + + KStream<Integer, String> stream; + MockProcessorDef<String, String> processor; + + processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.flatMap(mapper).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(6, processor.processed.size()); + + String[] expected = {"10:V1", "20:V2", "21:V2", "30:V3", "31:V3", "32:V3"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java new file mode 100644 index 0000000..09dda65 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; + +public class KStreamFlatMapValuesTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @Test + public void testFlatMapValues() { + KStreamBuilder builder = new KStreamBuilder(); + + ValueMapper<String, Iterable<String>> mapper = + new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + ArrayList<String> result = new ArrayList<String>(); + result.add(value.toLowerCase()); + result.add(value); + return result; + } + }; + + final int[] expectedKeys = {0, 1, 2, 3}; + + KStream<Integer, String> stream; + MockProcessorDef<Integer, String> processor; + + processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.flatMapValues(mapper).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(8, processor.processed.size()); + + String[] expected = {"0:v0", "0:V0", "1:v1", "1:V1", "2:v2", "2:V2", "3:v3", "3:V3"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java new file mode 100644 index 0000000..0660ddd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.UnlimitedWindowDef; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; + + +public class KStreamImplTest { + + @Test + public void testNumProcesses() { + final Deserializer<String> deserializer = new StringDeserializer(); + final KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> source1 = builder.from(deserializer, deserializer, "topic-1", "topic-2"); + + KStream<String, String> source2 = builder.from(deserializer, deserializer, "topic-3", "topic-4"); + + KStream<String, String> stream1 = + source1.filter(new Predicate<String, String>() { + @Override + public boolean apply(String key, String value) { + return true; + } + }).filterOut(new Predicate<String, String>() { + @Override + public boolean apply(String key, String value) { + return false; + } + }); + + KStream<String, Integer> stream2 = stream1.mapValues(new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return new Integer(value); + } + }); + + KStream<String, Integer> stream3 = source2.flatMapValues(new ValueMapper<String, Iterable<Integer>>() { + @Override + public Iterable<Integer> apply(String value) { + return Collections.singletonList(new Integer(value)); + } + }); + + KStream<String, Integer>[] streams2 = stream2.branch( + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return (value % 2) == 0; + } + }, + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return true; + } + } + ); + + KStream<String, Integer>[] streams3 = stream3.branch( + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return (value % 2) == 0; + } + }, + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return true; + } + } + ); + + KStream<String, Integer> stream4 = streams2[0].with(new UnlimitedWindowDef<String, Integer>("window")) + .join(streams3[0].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() { + @Override + public Integer apply(Integer value1, Integer value2) { + return value1 + value2; + } + }); + + KStream<String, Integer> stream5 = streams2[1].with(new UnlimitedWindowDef<String, Integer>("window")) + .join(streams3[1].with(new UnlimitedWindowDef<String, Integer>("window")), new ValueJoiner<Integer, Integer, Integer>() { + @Override + public Integer apply(Integer value1, Integer value2) { + return value1 + value2; + } + }); + + stream4.to("topic-5"); + + stream5.through("topic-6").process(new MockProcessorDef<>()).to("topic-7"); + + assertEquals(2 + // sources + 2 + // stream1 + 1 + // stream2 + 1 + // stream3 + 1 + 2 + // streams2 + 1 + 2 + // streams3 + 2 + 3 + // stream4 + 2 + 3 + // stream5 + 1 + // to + 2 + // through + 1 + // process + 1, // to + builder.build().processors().size()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java new file mode 100644 index 0000000..7dea8e0 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamJoinTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStreamWindowed; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.apache.kafka.test.UnlimitedWindowDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamJoinTest { + + private String topic1 = "topic1"; + private String topic2 = "topic2"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { + @Override + public String apply(String value1, String value2) { + return value1 + "+" + value2; + } + }; + + private ValueMapper<String, String> valueMapper = new ValueMapper<String, String>() { + @Override + public String apply(String value) { + return "#" + value; + } + }; + + private ValueMapper<String, Iterable<String>> valueMapper2 = new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return (Iterable<String>) Utils.mkSet(value); + } + }; + + private KeyValueMapper<Integer, String, KeyValue<Integer, String>> keyValueMapper = + new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { + @Override + public KeyValue<Integer, String> apply(Integer key, String value) { + return KeyValue.pair(key, value); + } + }; + + KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>> keyValueMapper2 = + new KeyValueMapper<Integer, String, KeyValue<Integer, Iterable<String>>>() { + @Override + public KeyValue<Integer, Iterable<String>> apply(Integer key, String value) { + return KeyValue.pair(key, (Iterable<String>) Utils.mkSet(value)); + } + }; + + + @Test + public void testJoin() { + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream1; + KStream<Integer, String> stream2; + KStreamWindowed<Integer, String> windowed1; + KStreamWindowed<Integer, String> windowed2; + MockProcessorDef<Integer, String> processor; + String[] expected; + + processor = new MockProcessorDef<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + windowed1 = stream1.with(new UnlimitedWindowDef<Integer, String>("window1")); + windowed2 = stream2.with(new UnlimitedWindowDef<Integer, String>("window2")); + + windowed1.join(windowed2, joiner).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + driver.setTime(0L); + + // push two items to the main stream. the other stream's window is empty + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + assertEquals(0, processor.processed.size()); + + // push two items to the other stream. the main stream's window has two items + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + assertEquals(2, processor.processed.size()); + + expected = new String[]{"0:X0+Y0", "1:X1+Y1"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + + processor.processed.clear(); + + // push all items to the main stream. this should produce two items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + assertEquals(2, processor.processed.size()); + + expected = new String[]{"0:X0+Y0", "1:X1+Y1"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + + processor.processed.clear(); + + // there will be previous two items + all items in the main stream's window, thus two are duplicates. + + // push all items to the other stream. this should produce 6 items + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + assertEquals(6, processor.processed.size()); + + expected = new String[]{"0:X0+Y0", "0:X0+Y0", "1:X1+Y1", "1:X1+Y1", "2:X2+Y2", "3:X3+Y3"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + + // TODO: test for joinability +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java new file mode 100644 index 0000000..bec524f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamMapTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @Test + public void testMap() { + KStreamBuilder builder = new KStreamBuilder(); + + KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper = + new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() { + @Override + public KeyValue<String, Integer> apply(Integer key, String value) { + return KeyValue.pair(value, key); + } + }; + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream; + MockProcessorDef<String, Integer> processor; + + processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.map(mapper).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = new String[]{"V0:0", "V1:1", "V2:2", "V3:3"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java new file mode 100644 index 0000000..b6507fe --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class KStreamMapValuesTest { + + private String topicName = "topic"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @Test + public void testFlatMapValues() { + KStreamBuilder builder = new KStreamBuilder(); + + ValueMapper<String, Integer> mapper = + new ValueMapper<String, Integer>() { + @Override + public Integer apply(String value) { + return value.length(); + } + }; + + final int[] expectedKeys = {1, 10, 100, 1000}; + + KStream<Integer, String> stream; + MockProcessorDef<Integer, Integer> processor = new MockProcessorDef<>(); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.mapValues(mapper).process(processor); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], Integer.toString(expectedKeys[i])); + } + + assertEquals(4, processor.processed.size()); + + String[] expected = {"1:1", "10:2", "100:3", "1000:4"}; + + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java new file mode 100644 index 0000000..48a9fc3 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.WindowDef; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.UnlimitedWindowDef; +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; + +public class KStreamWindowedTest { + + private String topicName = "topic"; + private String windowName = "MyWindow"; + + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + @Test + public void testWindowedStream() { + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream<Integer, String> stream; + WindowDef<Integer, String> windowDef; + + windowDef = new UnlimitedWindowDef<>(windowName); + stream = builder.from(keyDeserializer, valDeserializer, topicName); + stream.with(windowDef); + + KStreamTestDriver driver = new KStreamTestDriver(builder); + Window<Integer, String> window = (Window<Integer, String>) driver.getStateStore(windowName); + driver.setTime(0L); + + // two items in the window + + for (int i = 0; i < 2; i++) { + driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); + } + + assertEquals(1, countItem(window.find(0, 0L))); + assertEquals(1, countItem(window.find(1, 0L))); + assertEquals(0, countItem(window.find(2, 0L))); + assertEquals(0, countItem(window.find(3, 0L))); + + // previous two items + all items, thus two are duplicates, in the window + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topicName, expectedKeys[i], "Y" + expectedKeys[i]); + } + + assertEquals(2, countItem(window.find(0, 0L))); + assertEquals(2, countItem(window.find(1, 0L))); + assertEquals(1, countItem(window.find(2, 0L))); + assertEquals(1, countItem(window.find(3, 0L))); + } + + + private <T> int countItem(Iterator<T> iter) { + int i = 0; + while (iter.hasNext()) { + i++; + iter.next(); + } + return i; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java new file mode 100644 index 0000000..57a78ff --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import static org.junit.Assert.assertEquals; + +import org.apache.kafka.test.MockProcessorDef; +import org.junit.Test; + +public class TopologyBuilderTest { + + @Test(expected = TopologyException.class) + public void testAddSourceWithSameName() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "topic-1"); + builder.addSource("source", "topic-2"); + } + + @Test(expected = TopologyException.class) + public void testAddSourceWithSameTopic() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "topic-1"); + builder.addSource("source-2", "topic-1"); + } + + @Test(expected = TopologyException.class) + public void testAddProcessorWithSameName() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "topic-1"); + builder.addProcessor("processor", new MockProcessorDef(), "source"); + builder.addProcessor("processor", new MockProcessorDef(), "source"); + } + + @Test(expected = TopologyException.class) + public void testAddProcessorWithWrongParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addProcessor("processor", new MockProcessorDef(), "source"); + } + + @Test(expected = TopologyException.class) + public void testAddProcessorWithSelfParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addProcessor("processor", new MockProcessorDef(), "processor"); + } + + @Test(expected = TopologyException.class) + public void testAddSinkWithSameName() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "topic-1"); + builder.addSink("sink", "topic-2", "source"); + builder.addSink("sink", "topic-3", "source"); + } + + @Test(expected = TopologyException.class) + public void testAddSinkWithWrongParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSink("sink", "topic-2", "source"); + } + + @Test(expected = TopologyException.class) + public void testAddSinkWithSelfParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSink("sink", "topic-2", "sink"); + } + + @Test + public void testSourceTopics() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source-1", "topic-1"); + builder.addSource("source-2", "topic-2"); + builder.addSource("source-3", "topic-3"); + + assertEquals(builder.sourceTopics().size(), 3); + } +}
