http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java index 6823e6d..00089ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; /** * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database. @@ -36,10 +36,10 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier { private final long retentionPeriod; private final boolean retainDuplicates; private final int numSegments; - private final Serdes<K, V> serdes; + private final StateSerdes<K, V> serdes; private final Time time; - public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) { + public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes, Time time) { this.name = name; this.retentionPeriod = retentionPeriod; this.retainDuplicates = retainDuplicates;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java index 4229f94..a439117 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Serdes; +import org.apache.kafka.streams.state.StateSerdes; import java.util.HashSet; import java.util.Set; @@ -36,7 +36,7 @@ public class StoreChangeLogger<K, V> { // TODO: these values should be configurable protected static final int DEFAULT_WRITE_BATCH_SIZE = 100; - protected final Serdes<K, V> serialization; + protected final StateSerdes<K, V> serialization; private final String topic; private final int partition; @@ -47,16 +47,16 @@ public class StoreChangeLogger<K, V> { protected Set<K> dirty; protected Set<K> removed; - public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) { + public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) { this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE); } - public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { + public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) { this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved); init(); } - protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) { + protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) { this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); this.context = context; this.partition = partition; http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 83ebe48..0dacde7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -18,11 +18,6 @@ package org.apache.kafka.streams; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamThread; import org.junit.Before; import org.junit.Test; @@ -43,11 +38,6 @@ public class StreamsConfigTest { public void setUp() { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); streamsConfig = new StreamsConfig(props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java index 88366fa..e04a273 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; @@ -34,9 +33,6 @@ public class KStreamBranchTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - @SuppressWarnings("unchecked") @Test public void testKStreamBranch() { @@ -67,7 +63,7 @@ public class KStreamBranchTest { KStream<Integer, String>[] branches; MockProcessorSupplier<Integer, String>[] processors; - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); branches = stream.branch(isEven, isMultipleOfThree, isOdd); assertEquals(3, branches.length); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java index 3bad041..ecf1115 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.Predicate; @@ -33,9 +32,6 @@ public class KStreamFilterTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() { @Override public boolean test(Integer key, String value) { @@ -52,7 +48,7 @@ public class KStreamFilterTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.filter(isMultipleOfThree).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); @@ -72,7 +68,7 @@ public class KStreamFilterTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.filterOut(isMultipleOfThree).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java index 693f58e..bc85757 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; @@ -35,9 +34,6 @@ public class KStreamFlatMapTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - @Test public void testFlatMap() { KStreamBuilder builder = new KStreamBuilder(); @@ -60,7 +56,7 @@ public class KStreamFlatMapTest { MockProcessorSupplier<String, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.flatMap(mapper).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java index eef7933..a904cb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; @@ -34,9 +33,6 @@ public class KStreamFlatMapValuesTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - @Test public void testFlatMapValues() { KStreamBuilder builder = new KStreamBuilder(); @@ -58,7 +54,7 @@ public class KStreamFlatMapValuesTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); stream.flatMapValues(mapper).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 3d3a9e3..38182bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -17,12 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -39,18 +35,16 @@ import static org.junit.Assert.assertEquals; public class KStreamImplTest { + final private Serde<String> stringSerde = Serdes.String(); + final private Serde<Integer> intSerde = Serdes.Integer(); + @Test public void testNumProcesses() { - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Integer> integerSerializer = new IntegerSerializer(); - final Deserializer<Integer> integerDeserializer = new IntegerDeserializer(); - final KStreamBuilder builder = new KStreamBuilder(); - KStream<String, String> source1 = builder.stream(stringDeserializer, stringDeserializer, "topic-1", "topic-2"); + KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2"); - KStream<String, String> source2 = builder.stream(stringDeserializer, stringDeserializer, "topic-3", "topic-4"); + KStream<String, String> source2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4"); KStream<String, String> stream1 = source1.filter(new Predicate<String, String>() { @@ -114,14 +108,14 @@ public class KStreamImplTest { public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-0"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer); + }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde); KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of("join-1"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer); + }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde); stream4.to("topic-5"); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index e763fd2..d24ab15 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -44,10 +42,8 @@ public class KStreamKStreamJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -71,10 +67,9 @@ public class KStreamKStreamJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(keyDeserializer, valDeserializer, topic1); - stream2 = builder.stream(keyDeserializer, valDeserializer, topic2); - joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), - keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -177,10 +172,9 @@ public class KStreamKStreamJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(keyDeserializer, valDeserializer, topic1); - stream2 = builder.stream(keyDeserializer, valDeserializer, topic2); - joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), - keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -285,10 +279,9 @@ public class KStreamKStreamJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(keyDeserializer, valDeserializer, topic1); - stream2 = builder.stream(keyDeserializer, valDeserializer, topic2); - joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), - keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 8c6e43b..166e8ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; @@ -44,10 +42,8 @@ public class KStreamKStreamLeftJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -71,10 +67,9 @@ public class KStreamKStreamLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(keyDeserializer, valDeserializer, topic1); - stream2 = builder.stream(keyDeserializer, valDeserializer, topic2); - joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), - keySerializer, valSerializer, keyDeserializer, valDeserializer); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -157,10 +152,9 @@ public class KStreamKStreamLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream1 = builder.stream(keyDeserializer, valDeserializer, topic1); - stream2 = builder.stream(keyDeserializer, valDeserializer, topic2); - joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), - keySerializer, valSerializer, keyDeserializer, valDeserializer); + stream1 = builder.stream(intSerde, stringSerde, topic1); + stream2 = builder.stream(intSerde, stringSerde, topic2); + joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index f226cee..8e672a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -18,10 +18,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -47,10 +45,8 @@ public class KStreamKTableLeftJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -81,8 +77,8 @@ public class KStreamKTableLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topic1); - table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + stream = builder.stream(intSerde, stringSerde, topic1); + table = builder.table(intSerde, stringSerde, topic2); stream.leftJoin(table, joiner).process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -162,8 +158,8 @@ public class KStreamKTableLeftJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topic1).map(keyValueMapper); - table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper); + table = builder.table(intSerde, stringSerde, topic2); stream.leftJoin(table, joiner).process(processor); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index 73c517b..68fa656 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; @@ -33,8 +33,8 @@ public class KStreamMapTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); @Test public void testMap() { @@ -50,11 +50,10 @@ public class KStreamMapTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KStream<Integer, String> stream; + KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName); MockProcessorSupplier<String, Integer> processor; processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); stream.map(mapper).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java index 68fd285..e671aab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueMapper; @@ -32,8 +32,8 @@ public class KStreamMapValuesTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); @Test public void testFlatMapValues() { @@ -51,7 +51,7 @@ public class KStreamMapValuesTest { KStream<Integer, String> stream; MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(intSerde, stringSerde, topicName); stream.mapValues(mapper).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index 426259f..4244de5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -17,7 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KeyValue; @@ -34,8 +35,7 @@ public class KStreamTransformTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); @Test public void testTransform() { @@ -71,9 +71,8 @@ public class KStreamTransformTest { final int[] expectedKeys = {1, 10, 100, 1000}; - KStream<Integer, Integer> stream; MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName); stream.transform(transformerSupplier).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 7def9db..52abdf7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -17,7 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueTransformer; @@ -33,8 +34,7 @@ public class KStreamTransformValuesTest { private String topicName = "topic"; - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private IntegerDeserializer valDeserializer = new IntegerDeserializer(); + final private Serde<Integer> intSerde = Serdes.Integer(); @Test public void testTransform() { @@ -72,7 +72,7 @@ public class KStreamTransformValuesTest { KStream<Integer, Integer> stream; MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>(); - stream = builder.stream(keyDeserializer, valDeserializer, topicName); + stream = builder.stream(intSerde, intSerde, topicName); stream.transformValues(valueTransformerSupplier).process(processor); KStreamTestDriver driver = new KStreamTestDriver(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 9e0745a..e19510f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.HoppingWindows; @@ -41,8 +39,7 @@ import static org.junit.Assert.assertEquals; public class KStreamWindowAggregateTest { - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); + final private Serde<String> strSerde = new Serdes.StringSerde(); private class StringAdd implements Aggregator<String, String, String> { @@ -68,13 +65,11 @@ public class KStreamWindowAggregateTest { final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; - KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); + strSerde, + strSerde); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); @@ -147,24 +142,20 @@ public class KStreamWindowAggregateTest { String topic1 = "topic1"; String topic2 = "topic2"; - KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic1-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); + strSerde, + strSerde); MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); - KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2); + KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(), HoppingWindows.of("topic2-Canonized").with(10L).every(5L), - strSerializer, - strSerializer, - strDeserializer, - strDeserializer); + strSerde, + strSerde); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index ec85ed7..fc01e5e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -38,8 +36,7 @@ import static org.junit.Assert.assertEquals; public class KTableAggregateTest { - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); private class StringAdd implements Aggregator<String, String, String> { @@ -74,15 +71,12 @@ public class KTableAggregateTest { final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); - KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(), + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); + KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(), new NoOpKeyValueMapper<String, String>(), - strSerializer, - strSerializer, - strSerializer, - strDeserializer, - strDeserializer, - strDeserializer, + stringSerde, + stringSerde, + stringSerde, "topic1-Canonized"); MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index c43bea0..5491ea3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -17,12 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -40,10 +36,8 @@ import static org.junit.Assert.assertNull; public class KTableFilterTest { - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); - private final Serializer<Integer> intSerializer = new IntegerSerializer(); - private final Deserializer<Integer> intDeserializer = new IntegerDeserializer(); + final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); @Test public void testKTable() { @@ -51,7 +45,7 @@ public class KTableFilterTest { String topic1 = "topic1"; - KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1); + KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1); KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() { @Override @@ -93,7 +87,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -112,7 +106,7 @@ public class KTableFilterTest { KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); KTableValueGetter<String, Integer> getter2 = getterSupplier2.get(); KTableValueGetter<String, Integer> getter3 = getterSupplier3.get(); @@ -178,7 +172,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -193,7 +187,7 @@ public class KTableFilterTest { builder.addProcessor("proc1", proc1, table1.name); builder.addProcessor("proc2", proc2, table2.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); @@ -233,7 +227,7 @@ public class KTableFilterTest { String topic1 = "topic1"; KTableImpl<String, Integer, Integer> table1 = - (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1); + (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1); KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter( new Predicate<String, Integer>() { @Override @@ -250,7 +244,7 @@ public class KTableFilterTest { builder.addProcessor("proc1", proc1, table1.name); builder.addProcessor("proc2", proc2, table2.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", 1); driver.process(topic1, "B", 1); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 2317c97..20c3a28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -40,16 +38,16 @@ import static org.junit.Assert.assertNull; public class KTableImplTest { + final private Serde<String> stringSerde = new Serdes.StringSerde(); + @Test public void testKTable() { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; String topic2 = "topic2"; - KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -74,7 +72,7 @@ public class KTableImplTest { MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>(); table3.toStream().process(proc3); - KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer); + KTable<String, String> table4 = table1.through(topic2, stringSerde, stringSerde); MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>(); table4.toStream().process(proc4); @@ -96,15 +94,13 @@ public class KTableImplTest { public void testValueGetter() throws IOException { File stateDir = Files.createTempDirectory("test").toFile(); try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; String topic2 = "topic2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -120,14 +116,14 @@ public class KTableImplTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(topic2, serializer, serializer, deserializer, deserializer); + table1.through(topic2, stringSerde, stringSerde); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); // two state store should be created assertEquals(2, driver.allStateStores().size()); @@ -223,9 +219,6 @@ public class KTableImplTest { @Test public void testStateStore() throws IOException { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); - String topic1 = "topic1"; String topic2 = "topic2"; @@ -234,9 +227,9 @@ public class KTableImplTest { KStreamBuilder builder = new KStreamBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -253,7 +246,7 @@ public class KTableImplTest { } }); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.setTime(0L); // no state store should be created @@ -267,9 +260,9 @@ public class KTableImplTest { KStreamBuilder builder = new KStreamBuilder(); KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, String> table2 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2); KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @@ -293,7 +286,7 @@ public class KTableImplTest { } }); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.setTime(0L); // two state store should be created http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index 12bfb9c..5f30574 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -47,10 +45,8 @@ public class KTableKTableJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -80,8 +76,8 @@ public class KTableKTableJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.join(table2, joiner); joined.toStream().process(processor); @@ -179,8 +175,8 @@ public class KTableKTableJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.join(table2, joiner); proc = new MockProcessorSupplier<>(); @@ -267,8 +263,8 @@ public class KTableKTableJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.join(table2, joiner); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index e3cf22b..f92c5ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -48,10 +46,8 @@ public class KTableKTableLeftJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -83,15 +79,11 @@ public class KTableKTableLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1; - KTable<Integer, String> table2; - KTable<Integer, String> joined; + KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1); + KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2); + KTable<Integer, String> joined = table1.leftJoin(table2, joiner); MockProcessorSupplier<Integer, String> processor; - processor = new MockProcessorSupplier<>(); - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); - joined = table1.leftJoin(table2, joiner); joined.toStream().process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -188,8 +180,8 @@ public class KTableKTableLeftJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.leftJoin(table2, joiner); proc = new MockProcessorSupplier<>(); @@ -276,8 +268,8 @@ public class KTableKTableLeftJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.leftJoin(table2, joiner); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index feabc08..6cc77e0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -47,10 +45,8 @@ public class KTableKTableOuterJoinTest { private String topic1 = "topic1"; private String topic2 = "topic2"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); + final private Serde<Integer> intSerde = new Serdes.IntegerSerde(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override @@ -80,8 +76,8 @@ public class KTableKTableOuterJoinTest { MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.outerJoin(table2, joiner); joined.toStream().process(processor); @@ -188,8 +184,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.outerJoin(table2, joiner); proc = new MockProcessorSupplier<>(); @@ -284,8 +280,8 @@ public class KTableKTableOuterJoinTest { KTable<Integer, String> joined; MockProcessorSupplier<Integer, String> proc; - table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1); - table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2); + table1 = builder.table(intSerde, stringSerde, topic1); + table2 = builder.table(intSerde, stringSerde, topic2); joined = table1.outerJoin(table2, joiner); ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 58f1c2a..aa3daeb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -41,8 +39,7 @@ import static org.junit.Assert.assertTrue; public class KTableMapValuesTest { - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); @Test public void testKTable() { @@ -50,7 +47,7 @@ public class KTableMapValuesTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() { @Override public Integer apply(String value) { @@ -75,15 +72,13 @@ public class KTableMapValuesTest { public void testValueGetter() throws IOException { File stateDir = Files.createTempDirectory("test").toFile(); try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; String topic2 = "topic2"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -99,14 +94,14 @@ public class KTableMapValuesTest { } }); KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) - table1.through(topic2, serializer, serializer, deserializer, deserializer); + table1.through(topic2, stringSerde, stringSerde); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier(); KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier(); KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); KTableValueGetter<String, String> getter1 = getterSupplier1.get(); getter1.init(driver.context()); @@ -201,14 +196,12 @@ public class KTableMapValuesTest { public void testNotSendingOldValue() throws IOException { File stateDir = Files.createTempDirectory("test").toFile(); try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -221,7 +214,7 @@ public class KTableMapValuesTest { builder.addProcessor("proc", proc, table2.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); assertFalse(table1.sendingOldValueEnabled()); assertFalse(table2.sendingOldValueEnabled()); @@ -254,14 +247,12 @@ public class KTableMapValuesTest { public void testSendingOldValue() throws IOException { File stateDir = Files.createTempDirectory("test").toFile(); try { - final Serializer<String> serializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); final KStreamBuilder builder = new KStreamBuilder(); String topic1 = "topic1"; KTableImpl<String, String, String> table1 = - (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1); + (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues( new ValueMapper<String, Integer>() { @Override @@ -276,7 +267,7 @@ public class KTableMapValuesTest { builder.addProcessor("proc", proc, table2.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); assertTrue(table1.sendingOldValueEnabled()); assertTrue(table2.sendingOldValueEnabled()); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java index 187a6f2..51276f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java @@ -17,10 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -38,8 +36,7 @@ import static org.junit.Assert.assertTrue; public class KTableSourceTest { - private final Serializer<String> strSerializer = new StringSerializer(); - private final Deserializer<String> strDeserializer = new StringDeserializer(); + final private Serde<String> stringSerde = new Serdes.StringSerde(); @Test public void testKTable() { @@ -47,7 +44,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1); MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); @@ -72,12 +69,11 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) - builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier(); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); KTableValueGetter<String, String> getter1 = getterSupplier1.get(); getter1.init(driver.context()); @@ -123,14 +119,13 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) - builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>(); builder.addProcessor("proc1", proc1, table1.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); @@ -165,8 +160,7 @@ public class KTableSourceTest { String topic1 = "topic1"; - KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) - builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1); + KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1); table1.enableSendingOldValues(); @@ -176,7 +170,7 @@ public class KTableSourceTest { builder.addProcessor("proc1", proc1, table1.name); - KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null); + KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null); driver.process(topic1, "A", "01"); driver.process(topic1, "B", "01"); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 1b8cbb8..7c6d5ec 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -37,8 +37,8 @@ public class WindowedStreamPartitionerTest { private String topicName = "topic"; - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); + private IntegerSerializer intSerializer = new IntegerSerializer(); + private StringSerializer stringSerializer = new StringSerializer(); private List<PartitionInfo> infos = Arrays.asList( new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), @@ -58,15 +58,15 @@ public class WindowedStreamPartitionerTest { DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); - WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer); + WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer); WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); - byte[] keyBytes = keySerializer.serialize(topicName, key); + byte[] keyBytes = intSerializer.serialize(topicName, key); String value = key.toString(); - byte[] valueBytes = valSerializer.serialize(topicName, value); + byte[] valueBytes = stringSerializer.serialize(topicName, value); Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 12210cc..ef08176 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -70,11 +71,9 @@ public class ProcessorTopologyTest { props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test"); props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath()); + props.setProperty(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.setProperty(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); - props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); this.config = new StreamsConfig(props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 21bdaff..ea24441 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.MockTimestampExtractor; import org.junit.Before; import org.junit.Test; @@ -89,15 +90,11 @@ public class StandbyTaskTest { private StreamsConfig createConfig(final File baseDir) throws Exception { return new StreamsConfig(new Properties() { { - setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); } }); }
