KAFKA-3336: Unify Serializer and Deserializer into Serialization Author: Guozhang Wang <[email protected]>
Reviewers: Michael G. Noll, Ismael Juma Closes #1066 from guozhangwang/K3336 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dea0719e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dea0719e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dea0719e Branch: refs/heads/trunk Commit: dea0719e99211684775780f5da8b93835d7a5dac Parents: f57dabb Author: Guozhang Wang <[email protected]> Authored: Thu Mar 17 15:41:59 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 17 15:41:59 2016 -0700 ---------------------------------------------------------------------- .../serialization/DoubleDeserializer.java | 46 +++++ .../common/serialization/DoubleSerializer.java | 46 +++++ .../kafka/common/serialization/Serde.java | 26 +++ .../kafka/common/serialization/Serdes.java | 193 +++++++++++++++++++ .../common/serialization/SerializationTest.java | 115 ++++++++--- .../examples/pageview/PageViewTypedDemo.java | 39 ++-- .../examples/pageview/PageViewUntypedDemo.java | 20 +- .../kafka/streams/examples/pipe/PipeDemo.java | 9 +- .../examples/wordcount/WordCountDemo.java | 20 +- .../wordcount/WordCountProcessorDemo.java | 9 +- .../org/apache/kafka/streams/StreamsConfig.java | 75 +++---- .../apache/kafka/streams/kstream/KStream.java | 133 +++++-------- .../kafka/streams/kstream/KStreamBuilder.java | 38 ++-- .../apache/kafka/streams/kstream/KTable.java | 51 ++--- .../streams/kstream/internals/KStreamImpl.java | 134 +++++-------- .../streams/kstream/internals/KTableImpl.java | 94 ++++----- .../kstream/internals/KTableStoreSupplier.java | 13 +- .../streams/processor/ProcessorContext.java | 25 +-- .../streams/processor/TopologyBuilder.java | 24 +-- .../internals/ProcessorContextImpl.java | 35 +--- .../streams/processor/internals/SinkNode.java | 4 +- .../streams/processor/internals/SourceNode.java | 4 +- .../processor/internals/StandbyContextImpl.java | 33 +--- .../processor/internals/StreamThread.java | 2 - .../org/apache/kafka/streams/state/Serdes.java | 136 ------------- .../apache/kafka/streams/state/StateSerdes.java | 108 +++++++++++ .../org/apache/kafka/streams/state/Stores.java | 109 ++++++----- .../kafka/streams/state/WindowStoreUtils.java | 6 +- .../internals/InMemoryKeyValueLoggedStore.java | 6 +- .../InMemoryKeyValueStoreSupplier.java | 10 +- .../InMemoryLRUCacheStoreSupplier.java | 6 +- .../streams/state/internals/MemoryLRUCache.java | 6 +- .../internals/RocksDBKeyValueStoreSupplier.java | 6 +- .../streams/state/internals/RocksDBStore.java | 14 +- .../state/internals/RocksDBWindowStore.java | 12 +- .../internals/RocksDBWindowStoreSupplier.java | 6 +- .../state/internals/StoreChangeLogger.java | 10 +- .../apache/kafka/streams/StreamsConfigTest.java | 10 - .../kstream/internals/KStreamBranchTest.java | 8 +- .../kstream/internals/KStreamFilterTest.java | 10 +- .../kstream/internals/KStreamFlatMapTest.java | 8 +- .../internals/KStreamFlatMapValuesTest.java | 8 +- .../kstream/internals/KStreamImplTest.java | 24 +-- .../internals/KStreamKStreamJoinTest.java | 33 ++-- .../internals/KStreamKStreamLeftJoinTest.java | 26 +-- .../internals/KStreamKTableLeftJoinTest.java | 20 +- .../kstream/internals/KStreamMapTest.java | 11 +- .../kstream/internals/KStreamMapValuesTest.java | 10 +- .../kstream/internals/KStreamTransformTest.java | 9 +- .../internals/KStreamTransformValuesTest.java | 8 +- .../internals/KStreamWindowAggregateTest.java | 33 ++-- .../kstream/internals/KTableAggregateTest.java | 22 +-- .../kstream/internals/KTableFilterTest.java | 28 ++- .../kstream/internals/KTableImplTest.java | 37 ++-- .../kstream/internals/KTableKTableJoinTest.java | 24 +-- .../internals/KTableKTableLeftJoinTest.java | 30 ++- .../internals/KTableKTableOuterJoinTest.java | 24 +-- .../kstream/internals/KTableMapValuesTest.java | 31 ++- .../kstream/internals/KTableSourceTest.java | 26 +-- .../WindowedStreamPartitionerTest.java | 10 +- .../internals/ProcessorTopologyTest.java | 7 +- .../processor/internals/StandbyTaskTest.java | 7 +- .../internals/StreamPartitionAssignorTest.java | 7 +- .../processor/internals/StreamTaskTest.java | 7 +- .../processor/internals/StreamThreadTest.java | 7 +- .../streams/smoketest/SmokeTestClient.java | 69 +++---- .../streams/smoketest/SmokeTestDriver.java | 22 +-- .../kafka/streams/smoketest/SmokeTestUtil.java | 81 +------- .../streams/state/KeyValueStoreTestDriver.java | 72 +------ .../internals/InMemoryKeyValueStoreTest.java | 8 +- .../internals/InMemoryLRUCacheStoreTest.java | 8 +- .../internals/RocksDBKeyValueStoreTest.java | 8 +- .../state/internals/RocksDBWindowStoreTest.java | 49 +++-- .../state/internals/StoreChangeLoggerTest.java | 6 +- .../apache/kafka/test/KStreamTestDriver.java | 13 +- .../apache/kafka/test/MockProcessorContext.java | 55 ++---- 76 files changed, 1184 insertions(+), 1315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java new file mode 100644 index 0000000..ed4f323 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; + +public class DoubleDeserializer implements Deserializer<Double> { + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + @Override + public Double deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 8) { + throw new SerializationException("Size of data received by Deserializer is not 8"); + } + + long value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return Double.longBitsToDouble(value); + } + + @Override + public void close() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java new file mode 100644 index 0000000..9d01342 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class DoubleSerializer implements Serializer<Double> { + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + @Override + public byte[] serialize(String topic, Double data) { + if (data == null) + return null; + + long bits = Double.doubleToLongBits(data); + return new byte[] { + (byte) (bits >>> 56), + (byte) (bits >>> 48), + (byte) (bits >>> 40), + (byte) (bits >>> 32), + (byte) (bits >>> 24), + (byte) (bits >>> 16), + (byte) (bits >>> 8), + (byte) bits + }; + } + + @Override + public void close() { + // nothing to do + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java new file mode 100644 index 0000000..cc7944e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +/** + * The interface for wrapping a serializer and deserializer for the given data type. + * + * @param <T> + */ +public interface Serde<T> { + + Serializer<T> serializer(); + + Deserializer<T> deserializer(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java new file mode 100644 index 0000000..f27f74f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.serialization; + +import java.nio.ByteBuffer; + +/** + * Factory for creating serializers / deserializers. + */ +public class Serdes { + + static public final class LongSerde implements Serde<Long> { + @Override + public Serializer<Long> serializer() { + return new LongSerializer(); + } + + @Override + public Deserializer<Long> deserializer() { + return new LongDeserializer(); + } + } + + static public final class IntegerSerde implements Serde<Integer> { + @Override + public Serializer<Integer> serializer() { + return new IntegerSerializer(); + } + + @Override + public Deserializer<Integer> deserializer() { + return new IntegerDeserializer(); + } + } + + static public final class DoubleSerde implements Serde<Double> { + @Override + public Serializer<Double> serializer() { + return new DoubleSerializer(); + } + + @Override + public Deserializer<Double> deserializer() { + return new DoubleDeserializer(); + } + } + + static public final class StringSerde implements Serde<String> { + @Override + public Serializer<String> serializer() { + return new StringSerializer(); + } + + @Override + public Deserializer<String> deserializer() { + return new StringDeserializer(); + } + } + + static public final class ByteBufferSerde implements Serde<ByteBuffer> { + @Override + public Serializer<ByteBuffer> serializer() { + return new ByteBufferSerializer(); + } + + @Override + public Deserializer<ByteBuffer> deserializer() { + return new ByteBufferDeserializer(); + } + } + + static public final class ByteArraySerde implements Serde<byte[]> { + @Override + public Serializer<byte[]> serializer() { + return new ByteArraySerializer(); + } + + @Override + public Deserializer<byte[]> deserializer() { + return new ByteArrayDeserializer(); + } + } + + @SuppressWarnings("unchecked") + static public <T> Serde<T> serdeFrom(Class<T> type) { + if (String.class.isAssignableFrom(type)) { + return (Serde<T>) String(); + } + + if (Integer.class.isAssignableFrom(type)) { + return (Serde<T>) Integer(); + } + + if (Long.class.isAssignableFrom(type)) { + return (Serde<T>) Long(); + } + + if (Double.class.isAssignableFrom(type)) { + return (Serde<T>) Double(); + } + + if (byte[].class.isAssignableFrom(type)) { + return (Serde<T>) ByteArray(); + } + + if (ByteBufferSerde.class.isAssignableFrom(type)) { + return (Serde<T>) ByteBuffer(); + } + + // TODO: we can also serializes objects of type T using generic Java serialization by default + throw new IllegalArgumentException("Unknown class for built-in serializer"); + } + + /** + * Construct a serde object from separate serializer and deserializer + * + * @param serializer must not be null. + * @param deserializer must not be null. + */ + static public <T> Serde<T> serdeFrom(final Serializer<T> serializer, final Deserializer<T> deserializer) { + if (serializer == null) { + throw new IllegalArgumentException("serializer must not be null"); + } + if (deserializer == null) { + throw new IllegalArgumentException("deserializer must not be null"); + } + + return new Serde<T>() { + @Override + public Serializer<T> serializer() { + return serializer; + } + + @Override + public Deserializer<T> deserializer() { + return deserializer; + } + }; + } + + /* + * A serde for nullable long type. + */ + static public Serde<Long> Long() { + return new LongSerde(); + } + + /* + * A serde for nullable int type. + */ + static public Serde<Integer> Integer() { + return new IntegerSerde(); + } + + /* + * A serde for nullable long type. + */ + static public Serde<Double> Double() { + return new DoubleSerde(); + } + + /* + * A serde for nullable string type. + */ + static public Serde<String> String() { + return new StringSerde(); + } + + /* + * A serde for nullable byte array type. + */ + static public Serde<ByteBuffer> ByteBuffer() { + return new ByteBufferSerde(); + } + + /* + * A serde for nullable byte array type. + */ + static public Serde<byte[]> ByteArray() { + return new ByteArraySerde(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 87d9e0a..e4cd678 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -24,34 +24,53 @@ import static org.junit.Assert.assertEquals; public class SerializationTest { - private static class SerDeser<T> { - final Serializer<T> serializer; - final Deserializer<T> deserializer; + final private String topic = "testTopic"; - public SerDeser(Serializer<T> serializer, Deserializer<T> deserializer) { - this.serializer = serializer; - this.deserializer = deserializer; - } + private class DummyClass { + + } + + @Test + public void testSerdeFrom() { + Serde<Long> thisSerde = Serdes.serdeFrom(Long.class); + Serde<Long> otherSerde = Serdes.Long(); + + Long value = 423412424L; + + assertEquals("Should get the original long after serialization and deserialization", + value, thisSerde.deserializer().deserialize(topic, otherSerde.serializer().serialize(topic, value))); + assertEquals("Should get the original long after serialization and deserialization", + value, otherSerde.deserializer().deserialize(topic, thisSerde.serializer().serialize(topic, value))); + } + + @Test(expected = IllegalArgumentException.class) + public void testSerdeFromUnknown() { + Serdes.serdeFrom(DummyClass.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testSerdeFromNotNull() { + Serdes.serdeFrom(null, Serdes.Long().deserializer()); } @Test public void testStringSerializer() { String str = "my string"; - String mytopic = "testTopic"; + List<String> encodings = new ArrayList<String>(); encodings.add("UTF8"); encodings.add("UTF-16"); for (String encoding : encodings) { - SerDeser<String> serDeser = getStringSerDeser(encoding); - Serializer<String> serializer = serDeser.serializer; - Deserializer<String> deserializer = serDeser.deserializer; + Serde<String> serDeser = getStringSerde(encoding); + Serializer<String> serializer = serDeser.serializer(); + Deserializer<String> deserializer = serDeser.deserializer(); assertEquals("Should get the original string after serialization and deserialization with encoding " + encoding, - str, deserializer.deserialize(mytopic, serializer.serialize(mytopic, str))); + str, deserializer.deserialize(topic, serializer.serialize(topic, str))); assertEquals("Should support null in serialization and deserialization with encoding " + encoding, - null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + null, deserializer.deserialize(topic, serializer.serialize(topic, null))); } } @@ -61,18 +80,61 @@ public class SerializationTest { 423412424, -41243432 }; - String mytopic = "testTopic"; - Serializer<Integer> serializer = new IntegerSerializer(); - Deserializer<Integer> deserializer = new IntegerDeserializer(); + Serializer<Integer> serializer = Serdes.Integer().serializer(); + Deserializer<Integer> deserializer = Serdes.Integer().deserializer(); for (Integer integer : integers) { assertEquals("Should get the original integer after serialization and deserialization", - integer, deserializer.deserialize(mytopic, serializer.serialize(mytopic, integer))); + integer, deserializer.deserialize(topic, serializer.serialize(topic, integer))); + } + + assertEquals("Should support null in serialization and deserialization", + null, deserializer.deserialize(topic, serializer.serialize(topic, null))); + + serializer.close(); + deserializer.close(); + } + + @Test + public void testLongSerializer() { + Long[] longs = new Long[]{ + 922337203685477580L, + -922337203685477581L + }; + + Serializer<Long> serializer = Serdes.Long().serializer(); + Deserializer<Long> deserializer = Serdes.Long().deserializer(); + + for (Long value : longs) { + assertEquals("Should get the original long after serialization and deserialization", + value, deserializer.deserialize(topic, serializer.serialize(topic, value))); + } + + assertEquals("Should support null in serialization and deserialization", + null, deserializer.deserialize(topic, serializer.serialize(topic, null))); + + serializer.close(); + deserializer.close(); + } + + @Test + public void testDoubleSerializer() { + Double[] doubles = new Double[]{ + 5678567.12312d, + -5678567.12341d + }; + + Serializer<Double> serializer = Serdes.Double().serializer(); + Deserializer<Double> deserializer = Serdes.Double().deserializer(); + + for (Double value : doubles) { + assertEquals("Should get the original double after serialization and deserialization", + value, deserializer.deserialize(topic, serializer.serialize(topic, value))); } assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + null, deserializer.deserialize(topic, serializer.serialize(topic, null))); serializer.close(); deserializer.close(); @@ -80,34 +142,33 @@ public class SerializationTest { @Test public void testByteBufferSerializer() { - String mytopic = "testTopic"; ByteBuffer buf = ByteBuffer.allocate(10); buf.put("my string".getBytes()); - Serializer<ByteBuffer> serializer = new ByteBufferSerializer(); - Deserializer<ByteBuffer> deserializer = new ByteBufferDeserializer(); + Serializer<ByteBuffer> serializer = Serdes.ByteBuffer().serializer(); + Deserializer<ByteBuffer> deserializer = Serdes.ByteBuffer().deserializer(); assertEquals("Should get the original ByteBuffer after serialization and deserialization", - buf, deserializer.deserialize(mytopic, serializer.serialize(mytopic, buf))); + buf, deserializer.deserialize(topic, serializer.serialize(topic, buf))); assertEquals("Should support null in serialization and deserialization", - null, deserializer.deserialize(mytopic, serializer.serialize(mytopic, null))); + null, deserializer.deserialize(topic, serializer.serialize(topic, null))); serializer.close(); deserializer.close(); } - private SerDeser<String> getStringSerDeser(String encoder) { + private Serde<String> getStringSerde(String encoder) { Map<String, Object> serializerConfigs = new HashMap<String, Object>(); serializerConfigs.put("key.serializer.encoding", encoder); - Serializer<String> serializer = new StringSerializer(); + Serializer<String> serializer = Serdes.String().serializer(); serializer.configure(serializerConfigs, true); Map<String, Object> deserializerConfigs = new HashMap<String, Object>(); deserializerConfigs.put("key.deserializer.encoding", encoder); - Deserializer<String> deserializer = new StringDeserializer(); + Deserializer<String> deserializer = Serdes.String().deserializer(); deserializer.configure(deserializerConfigs, true); - return new SerDeser<String>(serializer, deserializer); + return Serdes.serdeFrom(serializer, deserializer); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java index 4f9de29..15083b2 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -18,9 +18,8 @@ package org.apache.kafka.streams.examples.pageview; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.HoppingWindows; @@ -83,10 +82,6 @@ public class PageViewTypedDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data @@ -94,35 +89,44 @@ public class PageViewTypedDemo { KStreamBuilder builder = new KStreamBuilder(); - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - // TODO: the following can be removed with a serialization factory Map<String, Object> serdeProps = new HashMap<>(); + final Serializer<PageView> pageViewSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", PageView.class); + pageViewSerializer.configure(serdeProps, false); + final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); serdeProps.put("JsonPOJOClass", PageView.class); pageViewDeserializer.configure(serdeProps, false); - final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileDeserializer.configure(serdeProps, false); - final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", UserProfile.class); userProfileSerializer.configure(serdeProps, false); + final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileDeserializer.configure(serdeProps, false); + final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); wPageViewByRegionSerializer.configure(serdeProps, false); + final Deserializer<WindowedPageViewByRegion> wPageViewByRegionDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); + wPageViewByRegionDeserializer.configure(serdeProps, false); + final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); serdeProps.put("JsonPOJOClass", RegionCount.class); regionCountSerializer.configure(serdeProps, false); - KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); + final Deserializer<RegionCount> regionCountDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", RegionCount.class); + regionCountDeserializer.configure(serdeProps, false); + + KStream<String, PageView> views = builder.stream(Serdes.String(), Serdes.serdeFrom(pageViewSerializer, pageViewDeserializer), "streams-pageview-input"); - KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); + KTable<String, UserProfile> users = builder.table(Serdes.String(), Serdes.serdeFrom(userProfileSerializer, userProfileDeserializer), "streams-userprofile-input"); KStream<WindowedPageViewByRegion, RegionCount> regionCount = views .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { @@ -146,8 +150,7 @@ public class PageViewTypedDemo { return new KeyValue<>(viewRegion.region, viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, stringDeserializer) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { @@ -166,7 +169,7 @@ public class PageViewTypedDemo { }); // write to the result topic - regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); + regionCount.to("streams-pageviewstats-typed-output", Serdes.serdeFrom(wPageViewByRegionSerializer, wPageViewByRegionDeserializer), Serdes.serdeFrom(regionCountSerializer, regionCountDeserializer)); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java index 9377095..5b80f64 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.connect.json.JsonSerializer; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.streams.KafkaStreams; @@ -59,10 +59,6 @@ public class PageViewUntypedDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data @@ -70,14 +66,13 @@ public class PageViewUntypedDemo { KStreamBuilder builder = new KStreamBuilder(); - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); + final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); - KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); + KStream<String, JsonNode> views = builder.stream(Serdes.String(), jsonSerde, "streams-pageview-input"); - KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); + KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde, "streams-userprofile-input"); KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { @Override @@ -103,8 +98,7 @@ public class PageViewUntypedDemo { return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); } }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, stringDeserializer) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), Serdes.String()) // TODO: we can merge ths toStream().map(...) with a single toStream(...) .toStream() .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { @@ -122,7 +116,7 @@ public class PageViewUntypedDemo { }); // write to the result topic - regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); + regionCount.to("streams-pageviewstats-untyped-output", jsonSerde, jsonSerde); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java index c37c68a..619f33d 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.examples.pipe; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; @@ -41,10 +40,8 @@ public class PipeDemo { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 03d5142..ebd6050 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -17,11 +17,7 @@ package org.apache.kafka.streams.examples.wordcount; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -52,21 +48,13 @@ public class WordCountDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - - KStream<String, String> source = builder.stream("streams-file-input"); + KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), "streams-file-input"); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @@ -80,9 +68,9 @@ public class WordCountDemo { return new KeyValue<String, String>(value, value); } }) - .countByKey(stringSerializer, stringDeserializer, "Counts"); + .countByKey(Serdes.String(), "Counts"); - counts.to("streams-wordcount-output", stringSerializer, longSerializer); + counts.to("streams-wordcount-output", Serdes.String(), Serdes.Long()); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index b651b3a..8457415 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.examples.wordcount; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KafkaStreams; @@ -108,10 +107,8 @@ public class WordCountProcessorDemo { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 52fdbd4..4e989be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -24,11 +24,12 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; +import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor; import java.util.Map; @@ -91,17 +92,13 @@ public class StreamsConfig extends AbstractConfig { public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; - /** <code>key.serializer</code> */ - public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; - - /** <code>value.serializer</code> */ - public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; - - /** <code>key.deserializer</code> */ - public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; + /** <code>replication.factor</code> */ + public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; + public static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the <code>Serde</code> interface."; - /** <code>value.deserializer</code> */ - public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + /** <code>replication.factor</code> */ + public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; + public static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the <code>Serde</code> interface."; /** <code>metrics.sample.window.ms</code> */ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG; @@ -121,8 +118,6 @@ public class StreamsConfig extends AbstractConfig { /** <code>auto.offset.reset</code> */ public static final String AUTO_OFFSET_RESET_CONFIG = ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; - private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor"; - static { CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value Type.STRING, @@ -152,32 +147,26 @@ public class StreamsConfig extends AbstractConfig { 1, Importance.MEDIUM, REPLICATION_FACTOR_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value - Type.CLASS, - Importance.HIGH, - ProducerConfig.KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, // required with no default value - Type.CLASS, - Importance.HIGH, - ProducerConfig.VALUE_SERIALIZER_CLASS_DOC) - .define(KEY_DESERIALIZER_CLASS_CONFIG, // required with no default value - Type.CLASS, - Importance.HIGH, - ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC) - .define(VALUE_DESERIALIZER_CLASS_CONFIG, // required with no default value - Type.CLASS, - Importance.HIGH, - ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - WALLCLOCK_TIMESTAMP_EXTRACTOR, + WallclockTimestampExtractor.class.getName(), Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, Type.CLASS, - DefaultPartitionGrouper.class, + DefaultPartitionGrouper.class.getName(), Importance.MEDIUM, PARTITION_GROUPER_CLASS_DOC) + .define(KEY_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + KEY_SERDE_CLASS_DOC) + .define(VALUE_SERDE_CLASS_CONFIG, + Type.CLASS, + Serdes.ByteArraySerde.class.getName(), + Importance.MEDIUM, + VALUE_SERDE_CLASS_DOC) .define(COMMIT_INTERVAL_MS_CONFIG, Type.LONG, 30000, @@ -273,8 +262,6 @@ public class StreamsConfig extends AbstractConfig { // remove properties that are not required for consumers removeStreamsSpecificConfigs(props); - props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG); - props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG); return props; } @@ -287,8 +274,6 @@ public class StreamsConfig extends AbstractConfig { // remove properties that are not required for producers removeStreamsSpecificConfigs(props); - props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG); - props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG); props.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer"); @@ -302,23 +287,17 @@ public class StreamsConfig extends AbstractConfig { props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + props.remove(StreamsConfig.KEY_SERDE_CLASS_CONFIG); + props.remove(StreamsConfig.VALUE_SERDE_CLASS_CONFIG); props.remove(InternalConfig.STREAM_THREAD_INSTANCE); } - public Serializer keySerializer() { - return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); - } - - public Serializer valueSerializer() { - return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); - } - - public Deserializer keyDeserializer() { - return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + public Serde keySerde() { + return getConfiguredInstance(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serde.class); } - public Deserializer valueDeserializer() { - return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); + public Serde valueSerde() { + return getConfiguredInstance(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serde.class); } public static void main(String[] args) { http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 1640bde..1c78652 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -108,18 +107,14 @@ public interface KStream<K, V> { * Sends key-value to a topic, also creates a new instance of KStream from the topic. * This is equivalent to calling to(topic) and from(topic). * - * @param topic the topic name - * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default key serializer defined in the configuration will be used - * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default value serializer defined in the configuration will be used - * @param keyDeserializer key deserializer used to create the new KStream, - * if not specified the default key deserializer defined in the configuration will be used - * @param valDeserializer value deserializer used to create the new KStream, - * if not specified the default value deserializer defined in the configuration will be used + * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used * @return the instance of KStream that consumes the given topic */ - KStream<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer); + KStream<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde); /** * Sends key-value to a topic using default serializers specified in the config. @@ -131,13 +126,13 @@ public interface KStream<K, V> { /** * Sends key-value to a topic. * - * @param topic the topic name - * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used - * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param keySerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used */ - void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer); + void to(String topic, Serde<K> keySerde, Serde<V> valSerde); /** * Applies a stateful transformation to all elements in this stream. @@ -171,18 +166,12 @@ public interface KStream<K, V> { * @param otherStream the instance of KStream joined with this stream * @param joiner ValueJoiner * @param windows the specification of the join window - * @param keySerializer key serializer, - * if not specified the default serializer defined in the configs will be used - * @param thisValueSerializer value serializer for this stream, - * if not specified the default serializer defined in the configs will be used - * @param otherValueSerializer value serializer for other stream, - * if not specified the default serializer defined in the configs will be used - * @param keyDeserializer key deserializer, - * if not specified the default serializer defined in the configs will be used - * @param thisValueDeserializer value deserializer for this stream, - * if not specified the default serializer defined in the configs will be used - * @param otherValueDeserializer value deserializer for other stream, - * if not specified the default serializer defined in the configs will be used + * @param keySerde key serdes, + * if not specified the default serdes defined in the configs will be used + * @param thisValueSerde value serdes for this stream, + * if not specified the default serdes defined in the configs will be used + * @param otherValueSerde value serdes for other stream, + * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream */ @@ -190,12 +179,9 @@ public interface KStream<K, V> { KStream<K, V1> otherStream, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V> thisValueSerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> thisValueDeserializer, - Deserializer<V1> otherValueDeserializer); + Serde<K> keySerde, + Serde<V> thisValueSerde, + Serde<V1> otherValueSerde); /** * Combines values of this stream with another KStream using Windowed Outer Join. @@ -203,18 +189,12 @@ public interface KStream<K, V> { * @param otherStream the instance of KStream joined with this stream * @param joiner ValueJoiner * @param windows the specification of the join window - * @param keySerializer key serializer, - * if not specified the default serializer defined in the configs will be used - * @param thisValueSerializer value serializer for this stream, - * if not specified the default serializer defined in the configs will be used - * @param otherValueSerializer value serializer for other stream, - * if not specified the default serializer defined in the configs will be used - * @param keyDeserializer key deserializer, - * if not specified the default serializer defined in the configs will be used - * @param thisValueDeserializer value deserializer for this stream, - * if not specified the default serializer defined in the configs will be used - * @param otherValueDeserializer value deserializer for other stream, - * if not specified the default serializer defined in the configs will be used + * @param keySerde key serdes, + * if not specified the default serdes defined in the configs will be used + * @param thisValueSerde value serdes for this stream, + * if not specified the default serdes defined in the configs will be used + * @param otherValueSerde value serdes for other stream, + * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream */ @@ -222,12 +202,9 @@ public interface KStream<K, V> { KStream<K, V1> otherStream, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V> thisValueSerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> thisValueDeserializer, - Deserializer<V1> otherValueDeserializer); + Serde<K> keySerde, + Serde<V> thisValueSerde, + Serde<V1> otherValueSerde); /** * Combines values of this stream with another KStream using Windowed Left Join. @@ -235,14 +212,10 @@ public interface KStream<K, V> { * @param otherStream the instance of KStream joined with this stream * @param joiner ValueJoiner * @param windows the specification of the join window - * @param keySerializer key serializer, - * if not specified the default serializer defined in the configs will be used - * @param otherValueSerializer value serializer for other stream, - * if not specified the default serializer defined in the configs will be used - * @param keyDeserializer key deserializer, - * if not specified the default serializer defined in the configs will be used - * @param otherValueDeserializer value deserializer for other stream, - * if not specified the default serializer defined in the configs will be used + * @param keySerde key serdes, + * if not specified the default serdes defined in the configs will be used + * @param otherValueSerde value serdes for other stream, + * if not specified the default serdes defined in the configs will be used * @param <V1> the value type of the other stream * @param <R> the value type of the new stream */ @@ -250,10 +223,8 @@ public interface KStream<K, V> { KStream<K, V1> otherStream, ValueJoiner<V, V1, R> joiner, JoinWindows windows, - Serializer<K> keySerializer, - Serializer<V1> otherValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V1> otherValueDeserializer); + Serde<K> keySerde, + Serde<V1> otherValueSerde); /** * Combines values of this stream with KTable using Left Join. @@ -273,10 +244,8 @@ public interface KStream<K, V> { */ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer, Windows<W> windows, - Serializer<K> keySerializer, - Serializer<V> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> aggValueDeserializer); + Serde<K> keySerde, + Serde<V> aggValueSerde); /** * Aggregate values of this stream by key on a window basis. @@ -284,10 +253,8 @@ public interface KStream<K, V> { * @param reducer the class of Reducer */ KTable<K, V> reduceByKey(Reducer<V> reducer, - Serializer<K> keySerializer, - Serializer<V> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<V> aggValueDeserializer, + Serde<K> keySerde, + Serde<V> aggValueSerde, String name); /** @@ -301,10 +268,8 @@ public interface KStream<K, V> { <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, Windows<W> windows, - Serializer<K> keySerializer, - Serializer<T> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<T> aggValueDeserializer); + Serde<K> keySerde, + Serde<T> aggValueSerde); /** * Aggregate values of this stream by key without a window basis, and hence @@ -316,10 +281,8 @@ public interface KStream<K, V> { */ <T> KTable<K, T> aggregateByKey(Initializer<T> initializer, Aggregator<K, V, T> aggregator, - Serializer<K> keySerializer, - Serializer<T> aggValueSerializer, - Deserializer<K> keyDeserializer, - Deserializer<T> aggValueDeserializer, + Serde<K> keySerde, + Serde<T> aggValueSerde, String name); /** @@ -328,14 +291,12 @@ public interface KStream<K, V> { * @param windows the specification of the aggregation window */ <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows, - Serializer<K> keySerializer, - Deserializer<K> keyDeserializer); + Serde<K> keySerde); /** * Count number of messages of this stream by key without a window basis, and hence * return a ever updating counting table. */ - KTable<K, Long> countByKey(Serializer<K> keySerializer, - Deserializer<K> keyDeserializer, + KTable<K, Long> countByKey(Serde<K> keySerde, String name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 3cf198c..dfd9281 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.kstream.internals.KTableImpl; import org.apache.kafka.streams.kstream.internals.KTableSource; @@ -40,7 +39,6 @@ public class KStreamBuilder extends TopologyBuilder { super(); } - // TODO: needs updated /** * Creates a KStream instance for the specified topic. * The default deserializers specified in the config are used. @@ -55,17 +53,17 @@ public class KStreamBuilder extends TopologyBuilder { /** * Creates a KStream instance for the specified topic. * - * @param keyDeserializer key deserializer used to read this source KStream, - * if not specified the default deserializer defined in the configs will be used - * @param valDeserializer value deserializer used to read this source KStream, - * if not specified the default deserializer defined in the configs will be used - * @param topics the topic names, if empty default to all the topics in the config + * @param keySerde key serde used to read this source KStream, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source KStream, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names, if empty default to all the topics in the config * @return KStream */ - public <K, V> KStream<K, V> stream(Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String... topics) { + public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) { String name = newName(KStreamImpl.SOURCE_NAME); - addSource(name, keyDeserializer, valDeserializer, topics); + addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); return new KStreamImpl<>(this, name, Collections.singleton(name)); } @@ -78,33 +76,29 @@ public class KStreamBuilder extends TopologyBuilder { * @return KTable */ public <K, V> KTable<K, V> table(String topic) { - return table(null, null, null, null, topic); + return table(null, null, topic); } /** * Creates a KTable instance for the specified topic. * - * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default key serializer defined in the configuration will be used - * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default value serializer defined in the configuration will be used - * @param keyDeserializer key deserializer used to read this source KStream, - * if not specified the default deserializer defined in the configs will be used - * @param valDeserializer value deserializer used to read this source KStream, - * if not specified the default deserializer defined in the configs will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used * @param topic the topic name * @return KStream */ - public <K, V> KTable<K, V> table(Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer, String topic) { + public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic) { String source = newName(KStreamImpl.SOURCE_NAME); String name = newName(KTableImpl.SOURCE_NAME); - addSource(source, keyDeserializer, valDeserializer, topic); + addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(topic); addProcessor(name, processorSupplier, source); - return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerializer, valSerializer, keyDeserializer, valDeserializer); + return new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), keySerde, valSerde); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index b44ed21..0ae5150 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.kstream; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; /** @@ -68,17 +67,13 @@ public interface KTable<K, V> { * This is equivalent to calling to(topic) and table(topic). * * @param topic the topic name - * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default key serializer defined in the configuration will be used - * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default value serializer defined in the configuration will be used - * @param keyDeserializer key deserializer used to create the new KStream, - * if not specified the default key deserializer defined in the configuration will be used - * @param valDeserializer value deserializer used to create the new KStream, - * if not specified the default value deserializer defined in the configuration will be used + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used * @return the new stream that consumes the given topic */ - KTable<K, V> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer); + KTable<K, V> through(String topic, Serde<K> keySerde, Serde<V> valSerde); /** * Sends key-value to a topic using default serializers specified in the config. @@ -90,13 +85,13 @@ public interface KTable<K, V> { /** * Sends key-value to a topic. * - * @param topic the topic name - * @param keySerializer key serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used - * @param valSerializer value serializer used to send key-value pairs, - * if not specified the default serializer defined in the configs will be used + * @param topic the topic name + * @param keySerde key serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default serde defined in the configs will be used */ - void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer); + void to(String topic, Serde<K> keySerde, Serde<V> valSerde); /** * Creates a new instance of KStream from this KTable @@ -152,10 +147,8 @@ public interface KTable<K, V> { <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer, Reducer<V1> removeReducer, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, + Serde<K1> keySerde, + Serde<V1> valueSerde, String name); /** @@ -174,12 +167,9 @@ public interface KTable<K, V> { Aggregator<K1, V1, T> add, Aggregator<K1, V1, T> remove, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, - Serializer<K1> keySerializer, - Serializer<V1> valueSerializer, - Serializer<T> aggValueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V1> valueDeserializer, - Deserializer<T> aggValueDeserializer, + Serde<K1> keySerde, + Serde<V1> valueSerde, + Serde<T> aggValueSerde, String name); /** @@ -191,10 +181,7 @@ public interface KTable<K, V> { * @return the instance of KTable */ <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, - Serializer<K1> keySerializer, - Serializer<V> valueSerializer, - Deserializer<K1> keyDeserializer, - Deserializer<V> valueDeserializer, + Serde<K1> keySerde, + Serde<V> valueSerde, String name); - }
