Repository: kafka Updated Branches: refs/heads/trunk 0d8cbbcb2 -> 5d0cd7667
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 00089ab..0407299 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -17,10 +17,10 @@ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.StateSerdes; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -36,15 +36,21 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; - private final StateSerdes<K, V> serdes; + private final Serde<K> keySerde; + private final Serde<V> valueSerde; private final Time time; - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes, Time time) { + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) { + this(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde, null); + } + + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde, Time time) { this.name = name; this.retentionPeriod = retentionPeriod; this.retainDuplicates = retainDuplicates; this.numSegments = numSegments; - this.serdes = serdes; + this.keySerde = keySerde; + this.valueSerde = valueSerde; this.time = time; } @@ -53,7 +59,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { } public StateStore get() { - return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, serdes).enableLogging(), "rocksdb-window", time); + return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, keySerde, valueSerde).enableLogging(), "rocksdb-window", time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 20c3a28..6f49b6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -72,7 +72,7 @@ public class KTableImplTest { MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>(); table3.toStream().process(proc3); - KTable<String, String> table4 = table1.through(topic2, stringSerde, stringSerde); + KTable<String, String> table4 = table1.through(stringSerde, stringSerde, topic2); MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>(); table4.toStream().process(proc4); @@ -116,7 +116,7 @@ public class KTableImplTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(topic2, stringSerde, stringSerde); + table1.through(stringSerde, stringSerde, topic2); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index aa3daeb..9ec1258 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -94,7 +94,7 @@ public class KTableMapValuesTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(topic2, stringSerde, stringSerde); + table1.through(stringSerde, stringSerde, topic2); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index ce4956c..0a02824 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -94,7 +94,7 @@ public class SmokeTestClient extends SmokeTestUtil { KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data"); - source.to("echo", stringSerde, intSerde); + source.to(stringSerde, intSerde, "echo"); KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() { @Override @@ -123,7 +123,7 @@ public class SmokeTestClient extends SmokeTestUtil { intSerde ).toStream().map( new Unwindow<String, Integer>() - ).to("min", stringSerde, intSerde); + ).to(stringSerde, intSerde, "min"); KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min"); minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min")); @@ -146,7 +146,7 @@ public class SmokeTestClient extends SmokeTestUtil { intSerde ).toStream().map( new Unwindow<String, Integer>() - ).to("max", stringSerde, intSerde); + ).to(stringSerde, intSerde, "max"); KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max"); maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max")); @@ -169,7 +169,7 @@ public class SmokeTestClient extends SmokeTestUtil { longSerde ).toStream().map( new Unwindow<String, Long>() - ).to("sum", stringSerde, longSerde); + ).to(stringSerde, longSerde, "sum"); KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum"); @@ -181,7 +181,7 @@ public class SmokeTestClient extends SmokeTestUtil { stringSerde ).toStream().map( new Unwindow<String, Long>() - ).to("cnt", stringSerde, longSerde); + ).to(stringSerde, longSerde, "cnt"); KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt"); cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt")); @@ -193,7 +193,7 @@ public class SmokeTestClient extends SmokeTestUtil { return value1 - value2; } } - ).to("dif", stringSerde, intSerde); + ).to(stringSerde, intSerde, "dif"); // avg sumTable.join( @@ -203,7 +203,7 @@ public class SmokeTestClient extends SmokeTestUtil { return (double) value1 / (double) value2; } } - ).to("avg", stringSerde, doubleSerde); + ).to(stringSerde, doubleSerde, "avg"); // windowed count data.countByKey( @@ -216,7 +216,7 @@ public class SmokeTestClient extends SmokeTestUtil { return new KeyValue<>(key.value() + "@" + key.window().start(), value); } } - ).to("wcnt", stringSerde, longSerde); + ).to(stringSerde, longSerde, "wcnt"); // test repartition Agg agg = new Agg(); @@ -229,7 +229,7 @@ public class SmokeTestClient extends SmokeTestUtil { longSerde, longSerde, "cntByCnt" - ).to("tagg", stringSerde, longSerde); + ).to(stringSerde, longSerde, "tagg"); return new KafkaStreams(builder, props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d0cd766/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index ffc97c3..502870b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -57,11 +57,13 @@ public class RocksDBWindowStoreTest { private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL; private final long retentionPeriod = segmentSize * (numSegments - 1); private final long windowSize = 3; - private final StateSerdes<Integer, String> serdes = StateSerdes.withBuiltinTypes("", Integer.class, String.class); + private final Serde<Integer> intSerde = Serdes.Integer(); + private final Serde<String> stringSerde = Serdes.String(); + private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", intSerde, stringSerde); @SuppressWarnings("unchecked") - protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, StateSerdes<K, V> serdes) { - StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null); + protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context) { + StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, intSerde, stringSerde); WindowStore<K, V> store = (WindowStore<K, V>) supplier.get(); store.init(context, store); @@ -89,7 +91,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); try { long startTime = segmentSize - 4L; @@ -185,7 +187,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); try { long startTime = segmentSize - 4L; @@ -281,7 +283,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); try { long startTime = segmentSize - 4L; @@ -377,7 +379,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); try { long startTime = segmentSize - 4L; @@ -436,7 +438,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); try { @@ -553,7 +555,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); try { context.setTime(startTime); store.put(0, "zero"); @@ -602,7 +604,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); @@ -654,7 +656,7 @@ public class RocksDBWindowStoreTest { byteArraySerde, byteArraySerde, recordCollector); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); @@ -759,7 +761,7 @@ public class RocksDBWindowStoreTest { File storeDir = new File(baseDir, windowName); - WindowStore<Integer, String> store = createWindowStore(context, serdes); + WindowStore<Integer, String> store = createWindowStore(context); RocksDBWindowStore<Integer, String> inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); @@ -775,7 +777,7 @@ public class RocksDBWindowStoreTest { store.close(); } - store = createWindowStore(context, serdes); + store = createWindowStore(context); inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); try {
