This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 35a0de3 KAFKA-6161 Add default implementation to close() and configure() for Serdes (#5348) 35a0de3 is described below commit 35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0 Author: Chia-Ping Tsai <chia7...@gmail.com> AuthorDate: Fri Feb 22 01:05:13 2019 +0800 KAFKA-6161 Add default implementation to close() and configure() for Serdes (#5348) Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../kafka/common/serialization/ByteArrayDeserializer.java | 12 ------------ .../kafka/common/serialization/ByteArraySerializer.java | 13 ------------- .../kafka/common/serialization/ByteBufferDeserializer.java | 10 ---------- .../kafka/common/serialization/ByteBufferSerializer.java | 10 ---------- .../kafka/common/serialization/BytesDeserializer.java | 11 ----------- .../apache/kafka/common/serialization/BytesSerializer.java | 11 ----------- .../apache/kafka/common/serialization/Deserializer.java | 13 +++++++++++-- .../kafka/common/serialization/DoubleDeserializer.java | 12 ------------ .../kafka/common/serialization/DoubleSerializer.java | 13 ------------- .../kafka/common/serialization/FloatDeserializer.java | 14 -------------- .../apache/kafka/common/serialization/FloatSerializer.java | 13 ------------- .../kafka/common/serialization/IntegerDeserializer.java | 11 ----------- .../kafka/common/serialization/IntegerSerializer.java | 11 ----------- .../kafka/common/serialization/LongDeserializer.java | 11 ----------- .../apache/kafka/common/serialization/LongSerializer.java | 11 ----------- .../java/org/apache/kafka/common/serialization/Serde.java | 9 +++++++-- .../org/apache/kafka/common/serialization/Serializer.java | 10 +++++++--- .../kafka/common/serialization/ShortDeserializer.java | 10 ---------- .../apache/kafka/common/serialization/ShortSerializer.java | 11 ----------- .../kafka/common/serialization/StringDeserializer.java | 5 ----- .../kafka/common/serialization/StringSerializer.java | 5 ----- .../kafka/common/serialization/UUIDDeserializer.java | 5 ----- .../apache/kafka/common/serialization/UUIDSerializer.java | 5 ----- .../test/java/org/apache/kafka/test/MockSerializer.java | 5 ----- .../org/apache/kafka/connect/json/JsonDeserializer.java | 10 ---------- .../java/org/apache/kafka/connect/json/JsonSerializer.java | 11 ----------- .../test/scala/kafka/tools/CustomDeserializerTest.scala | 5 ----- docs/streams/upgrade-guide.html | 5 +++++ .../src/main/java/org/apache/kafka/streams/Topology.java | 4 +++- .../streams/kstream/internals/ChangedDeserializer.java | 6 ------ .../kafka/streams/kstream/internals/ChangedSerializer.java | 6 ------ .../java/org/apache/kafka/streams/StreamsConfigTest.java | 3 --- .../java/org/apache/kafka/streams/perf/YahooBenchmark.java | 12 ------------ .../kafka/streams/processor/internals/SourceNodeTest.java | 7 ------- .../streams/state/internals/SerdeThatDoesntHandleNull.java | 12 ------------ .../org/apache/kafka/streams/TopologyTestDriverTest.java | 8 -------- .../org/apache/kafka/tools/ClientCompatibilityTest.java | 10 ---------- 37 files changed, 33 insertions(+), 307 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java index 2672115..1147f45 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArrayDeserializer.java @@ -16,22 +16,10 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ByteArrayDeserializer implements Deserializer<byte[]> { @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - - @Override public byte[] deserialize(String topic, byte[] data) { return data; } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java index d069e94..6bebaa6 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteArraySerializer.java @@ -16,22 +16,9 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ByteArraySerializer implements Serializer<byte[]> { - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(String topic, byte[] data) { return data; } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java index d41f03c..0dfcf5f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferDeserializer.java @@ -17,22 +17,12 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; public class ByteBufferDeserializer implements Deserializer<ByteBuffer> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public ByteBuffer deserialize(String topic, byte[] data) { if (data == null) return null; return ByteBuffer.wrap(data); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index c8c3692..9fb1254 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -17,14 +17,8 @@ package org.apache.kafka.common.serialization; import java.nio.ByteBuffer; -import java.util.Map; public class ByteBufferSerializer implements Serializer<ByteBuffer> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, ByteBuffer data) { if (data == null) return null; @@ -43,8 +37,4 @@ public class ByteBufferSerializer implements Serializer<ByteBuffer> { data.rewind(); return ret; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java index 66b07eb..1350dca 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java @@ -18,22 +18,11 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.utils.Bytes; -import java.util.Map; - public class BytesDeserializer implements Deserializer<Bytes> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public Bytes deserialize(String topic, byte[] data) { if (data == null) return null; return new Bytes(data); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java index 0dc4476..62ea6ec 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java @@ -18,23 +18,12 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.utils.Bytes; -import java.util.Map; - public class BytesSerializer implements Serializer<Bytes> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Bytes data) { if (data == null) return null; return data.get(); } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java index bc1a714..eb56485 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java @@ -37,7 +37,9 @@ public interface Deserializer<T> extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map<String, ?> configs, boolean isKey); + default void configure(Map<String, ?> configs, boolean isKey) { + // intentionally left blank + } /** * Deserialize a record value from a byte array into a value or object. @@ -58,6 +60,13 @@ public interface Deserializer<T> extends Closeable { return deserialize(topic, data); } + /** + * Close this deserializer. + * <p> + * This method must be idempotent as it may be called multiple times. + */ @Override - void close(); + default void close() { + // intentionally left blank + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java index 24f6007..0fa1cce 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleDeserializer.java @@ -18,16 +18,9 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class DoubleDeserializer implements Deserializer<Double> { @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - - @Override public Double deserialize(String topic, byte[] data) { if (data == null) return null; @@ -42,9 +35,4 @@ public class DoubleDeserializer implements Deserializer<Double> { } return Double.longBitsToDouble(value); } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java index 7dd4edc..99781b5 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/DoubleSerializer.java @@ -16,15 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class DoubleSerializer implements Serializer<Double> { - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(String topic, Double data) { if (data == null) @@ -42,9 +34,4 @@ public class DoubleSerializer implements Serializer<Double> { (byte) bits }; } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java index 3834ce2..0903177 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatDeserializer.java @@ -18,15 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class FloatDeserializer implements Deserializer<Float> { - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // nothing to do - } - @Override public Float deserialize(final String topic, final byte[] data) { if (data == null) @@ -42,10 +34,4 @@ public class FloatDeserializer implements Deserializer<Float> { } return Float.intBitsToFloat(value); } - - @Override - public void close() { - // nothing to do - } - } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java index 6eb766d..aa72d43 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/FloatSerializer.java @@ -16,15 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class FloatSerializer implements Serializer<Float> { - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // nothing to do - } - @Override public byte[] serialize(final String topic, final Float data) { if (data == null) @@ -38,9 +30,4 @@ public class FloatSerializer implements Serializer<Float> { (byte) bits }; } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java index 45f8cf1..20ca63f 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerDeserializer.java @@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class IntegerDeserializer implements Deserializer<Integer> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public Integer deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +33,4 @@ public class IntegerDeserializer implements Deserializer<Integer> { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java index f2144ce..8ab5310 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/IntegerSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class IntegerSerializer implements Serializer<Integer> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Integer data) { if (data == null) return null; @@ -35,8 +28,4 @@ public class IntegerSerializer implements Serializer<Integer> { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java index a58b1d3..1e445d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java @@ -18,14 +18,7 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class LongDeserializer implements Deserializer<Long> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public Long deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +33,4 @@ public class LongDeserializer implements Deserializer<Long> { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java index d37842c..436f0e0 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class LongSerializer implements Serializer<Long> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Long data) { if (data == null) return null; @@ -39,8 +32,4 @@ public class LongSerializer implements Serializer<Long> { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java index fbcc7c2..5b052e6 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serde.java @@ -34,14 +34,19 @@ public interface Serde<T> extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map<String, ?> configs, boolean isKey); + default void configure(Map<String, ?> configs, boolean isKey) { + // intentionally left blank + } /** * Close this serde class, which will close the underlying serializer and deserializer. + * <p> * This method has to be idempotent because it might be called multiple times. */ @Override - void close(); + default void close() { + // intentionally left blank + } Serializer<T> serializer(); diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java index c5d4760..144b5ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java @@ -37,7 +37,9 @@ public interface Serializer<T> extends Closeable { * @param configs configs in key/value pairs * @param isKey whether is for key or value */ - void configure(Map<String, ?> configs, boolean isKey); + default void configure(Map<String, ?> configs, boolean isKey) { + // intentionally left blank + } /** * Convert {@code data} into a byte array. @@ -62,9 +64,11 @@ public interface Serializer<T> extends Closeable { /** * Close this serializer. - * + * <p> * This method must be idempotent as it may be called multiple times. */ @Override - void close(); + default void close() { + // intentionally left blank + } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java index 45aa8ae..7814a7b 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -18,14 +18,8 @@ package org.apache.kafka.common.serialization; import org.apache.kafka.common.errors.SerializationException; -import java.util.Map; - public class ShortDeserializer implements Deserializer<Short> { - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public Short deserialize(String topic, byte[] data) { if (data == null) return null; @@ -40,8 +34,4 @@ public class ShortDeserializer implements Deserializer<Short> { } return value; } - - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java index a66aaa0..e54354b 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java @@ -16,14 +16,7 @@ */ package org.apache.kafka.common.serialization; -import java.util.Map; - public class ShortSerializer implements Serializer<Short> { - - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - public byte[] serialize(String topic, Short data) { if (data == null) return null; @@ -33,8 +26,4 @@ public class ShortSerializer implements Serializer<Short> { data.byteValue() }; } - - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java index 0398a1b..68e6c40 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringDeserializer.java @@ -49,9 +49,4 @@ public class StringDeserializer implements Deserializer<String> { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java index 28e4174..e16e19a 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/StringSerializer.java @@ -49,9 +49,4 @@ public class StringSerializer implements Serializer<String> { throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java index a6eb2ea..e852fc9 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java @@ -52,9 +52,4 @@ public class UUIDDeserializer implements Deserializer<UUID> { throw new SerializationException("Error parsing data into UUID", e); } } - - @Override - public void close() { - // do nothing - } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java index d8e2524..908c202 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDSerializer.java @@ -50,9 +50,4 @@ public class UUIDSerializer implements Serializer<UUID> { throw new SerializationException("Error when serializing UUID to byte[] due to unsupported encoding " + encoding); } } - - @Override - public void close() { - // nothing to do - } } diff --git a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java index 0c597c8..1c14445 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSerializer.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSerializer.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.serialization.Serializer; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -36,10 +35,6 @@ public class MockSerializer implements ClusterResourceListener, Serializer<byte[ } @Override - public void configure(Map<String, ?> configs, boolean isKey) { - } - - @Override public byte[] serialize(String topic, byte[] data) { // This will ensure that we get the cluster metadata when serialize is called for the first time // as subsequent compareAndSet operations will fail. diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java index 8f2171b..b006e22 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonDeserializer.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; -import java.util.Map; - /** * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily * structured data without having associated Java classes. This deserializer also supports Connect schemas. @@ -36,9 +34,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> { public JsonDeserializer() { } - @Override - public void configure(Map<String, ?> props, boolean isKey) { - } @Override public JsonNode deserialize(String topic, byte[] bytes) { @@ -54,9 +49,4 @@ public class JsonDeserializer implements Deserializer<JsonNode> { return data; } - - @Override - public void close() { - - } } diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java index 438daa1..94ec0a8 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonSerializer.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; -import java.util.Map; - /** * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily * structured data without corresponding Java classes. This serializer also supports Connect schemas. @@ -38,10 +36,6 @@ public class JsonSerializer implements Serializer<JsonNode> { } @Override - public void configure(Map<String, ?> config, boolean isKey) { - } - - @Override public byte[] serialize(String topic, JsonNode data) { if (data == null) return null; @@ -52,9 +46,4 @@ public class JsonSerializer implements Serializer<JsonNode> { throw new SerializationException("Error serializing JSON message", e); } } - - @Override - public void close() { - } - } diff --git a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala index f94a900..7fb3cf3 100644 --- a/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala +++ b/core/src/test/scala/kafka/tools/CustomDeserializerTest.scala @@ -27,16 +27,11 @@ import org.junit.Test import org.scalatest.mockito.MockitoSugar class CustomDeserializer extends Deserializer[String] { - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = { - } override def deserialize(topic: String, data: Array[Byte]): String = { assertThat("topic must not be null", topic, CoreMatchers.notNullValue) new String(data) } - - override def close(): Unit = { - } } class CustomDeserializerTest extends MockitoSugar { diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b4d957a..9071dc2 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -73,6 +73,11 @@ As of 2.3.0 Streams now offers an in-memory version of the window store, in addition to the persistent one based on RocksDB. The new public interface <code>inMemoryWindowStore()</code> is added to Stores that provides a built-in in-memory window store. </p> + <p> + In 2.3.0 we have added default implementation to close() and configure() for <code>Serializer</code>, <code>Deserializer</code> and <code>Serde</code> so that they can be + implemented by lambda expression. For more details please read <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-331+Add+default+implementation+to+close%28%29+and+configure%28%29+for+Serializer%2C+Deserializer+and+Serde">KIP-331</a>. + </p> + <h3><a id="streams_api_changes_220" href="#streams_api_changes_220">Streams API changes in 2.2.0</a></h3> <p> We've simplified the <code>KafkaStreams#state</code> transition diagram during the starting up phase a bit in 2.2.0: in older versions the state will transit from <code>CREATED</code> to <code>RUNNING</code>, and then to <code>REBALANCING</code> to get the first diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 8b2a46b..d13e4a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -304,6 +304,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name */ + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, @@ -359,7 +360,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by another source */ - + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, @@ -391,6 +392,7 @@ public class Topology { * @return itself * @throws TopologyException if processor is already added or if topics have already been registered by name */ + @SuppressWarnings("overloads") public synchronized Topology addSource(final AutoOffsetReset offsetReset, final String name, final TimestampExtractor timestampExtractor, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java index 56193d5..36f77b8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Deserializer; import java.nio.ByteBuffer; -import java.util.Map; public class ChangedDeserializer<T> implements Deserializer<Change<T>> { @@ -41,11 +40,6 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> { } @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // do nothing - } - - @Override public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) { final byte[] bytes = new byte[data.length - NEWFLAG_SIZE]; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java index 7fa34b7..bfd0afa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; import java.nio.ByteBuffer; -import java.util.Map; public class ChangedSerializer<T> implements Serializer<Change<T>> { @@ -41,11 +40,6 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> { this.inner = inner; } - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // do nothing - } - /** * @throws StreamsException if both old and new values of data are null, or if * both values are not null diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index afebfdb..2c9a97b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -646,9 +646,6 @@ public class StreamsConfigTest { } @Override - public void close() {} - - @Override public Serializer serializer() { return null; } diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java index 8607902..2cab626 100644 --- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java +++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java @@ -189,9 +189,6 @@ public class YahooBenchmark { public JsonPOJOSerializer() {} @Override - public void configure(final Map<String, ?> props, final boolean isKey) {} - - @Override public byte[] serialize(final String topic, final T data) { if (data == null) { return null; @@ -203,10 +200,6 @@ public class YahooBenchmark { throw new SerializationException("Error serializing JSON message", e); } } - - @Override - public void close() {} - } // Note: these are also in the streams example package, eventuall use 1 file @@ -242,11 +235,6 @@ public class YahooBenchmark { return data; } - - @Override - public void close() { - - } } private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java index 452dd7b..08112a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.test.MockSourceNode; import org.junit.Test; import java.nio.charset.StandardCharsets; -import java.util.Map; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -52,14 +51,8 @@ public class SourceNodeTest { } @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { } - - @Override public String deserialize(final String topic, final byte[] data) { return deserialize(topic, null, data); } - - @Override - public void close() { } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java index bf1d030..03e0c3a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SerdeThatDoesntHandleNull.java @@ -22,20 +22,8 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import java.util.Map; - class SerdeThatDoesntHandleNull implements Serde<String> { @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - - } - - @Override - public void close() { - - } - - @Override public Serializer<String> serializer() { return new StringSerializer(); } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 9de7798..6f6c51e 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -473,10 +473,6 @@ public class TopologyTestDriverTest { } return Serdes.Integer().serializer().serialize(topic, (Integer) data); } - @Override - public void close() {} - @Override - public void configure(final Map configs, final boolean isKey) {} }, new Serializer<Object>() { @Override @@ -486,10 +482,6 @@ public class TopologyTestDriverTest { } return Serdes.Double().serializer().serialize(topic, (Double) data); } - @Override - public void close() {} - @Override - public void configure(final Map configs, final boolean isKey) {} }, processor); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 5b7e228..887bdc4 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -340,21 +340,11 @@ public class ClientCompatibilityTest { } @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // nothing to do - } - - @Override public byte[] deserialize(String topic, byte[] data) { return data; } @Override - public void close() { - // nothing to do - } - - @Override public void onUpdate(ClusterResource clusterResource) { if (expectClusterId) { if (clusterResource.clusterId() == null) {