[ 
https://issues.apache.org/jira/browse/KAFKA-6913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494454#comment-16494454
 ] 

ASF GitHub Bot commented on KAFKA-6913:
---------------------------------------

ewencp closed pull request #5034: KAFKA-6913: Add Connect converters and header 
converters for short, integer, long, float, and double (WIP)
URL: https://github.com/apache/kafka/pull/5034
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java
new file mode 100644
index 00000000000..04019a7a529
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from double values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return an
+ * optional FLOAT64 schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+public class DoubleConverter extends NumberConverter<Double> {
+
+    public DoubleConverter() {
+        super("double", Schema.OPTIONAL_FLOAT64_SCHEMA, new 
DoubleSerializer(), new DoubleDeserializer());
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java
new file mode 100644
index 00000000000..16bf0e0f93f
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.FloatDeserializer;
+import org.apache.kafka.common.serialization.FloatSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from float values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return an
+ * optional FLOAT32 schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+public class FloatConverter extends NumberConverter<Float> {
+
+    public FloatConverter() {
+        super("float", Schema.OPTIONAL_FLOAT32_SCHEMA, new FloatSerializer(), 
new FloatDeserializer());
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java
new file mode 100644
index 00000000000..6f3c78a0a73
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from integer values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return an
+ * optional INT32 schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+public class IntegerConverter extends NumberConverter<Integer> {
+
+    public IntegerConverter() {
+        super("integer", Schema.OPTIONAL_INT32_SCHEMA, new 
IntegerSerializer(), new IntegerDeserializer());
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java
new file mode 100644
index 00000000000..600c3042502
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from long values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return an
+ * optional INT64 schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+public class LongConverter extends NumberConverter<Long> {
+
+    public LongConverter() {
+        super("long", Schema.OPTIONAL_INT64_SCHEMA, new LongSerializer(), new 
LongDeserializer());
+    }
+
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java
new file mode 100644
index 00000000000..9180444c0c8
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from number values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return the specified
+ * schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+abstract class NumberConverter<T extends Number> implements Converter, 
HeaderConverter {
+
+    private final Serializer<T> serializer;
+    private final Deserializer<T> deserializer;
+    private final String typeName;
+    private final Schema schema;
+
+    /**
+     * Create the converter.
+     *
+     * @param typeName the displayable name of the type; may not be null
+     * @param schema the optional schema to be used for all deserialized 
forms; may not be null
+     * @param serializer the serializer; may not be null
+     * @param deserializer the deserializer; may not be null
+     */
+    protected NumberConverter(String typeName, Schema schema, Serializer<T> 
serializer, Deserializer<T> deserializer) {
+        this.typeName = typeName;
+        this.schema = schema;
+        this.serializer = serializer;
+        this.deserializer = deserializer;
+        assert this.serializer != null;
+        assert this.deserializer != null;
+        assert this.typeName != null;
+        assert this.schema != null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return NumberConverterConfig.configDef();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        NumberConverterConfig conf = new NumberConverterConfig(configs);
+        boolean isKey = conf.type() == ConverterType.KEY;
+        serializer.configure(configs, isKey);
+        deserializer.configure(configs, isKey);
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> conf = new HashMap<>(configs);
+        conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? 
ConverterType.KEY.getName() : ConverterType.VALUE.getName());
+        configure(conf);
+    }
+
+    protected T cast(Object value) {
+        return (T) value;
+    }
+
+    @Override
+    public byte[] fromConnectData(String topic, Schema schema, Object value) {
+        try {
+            return serializer.serialize(topic, value == null ? null : 
cast(value));
+        } catch (ClassCastException e) {
+            throw new DataException("Failed to serialize to " + typeName + " 
(was " + value.getClass() + "): ", e);
+        } catch (SerializationException e) {
+            throw new DataException("Failed to serialize to " + typeName + ": 
", e);
+        }
+    }
+
+    @Override
+    public SchemaAndValue toConnectData(String topic, byte[] value) {
+        try {
+            return new SchemaAndValue(schema, deserializer.deserialize(topic, 
value));
+        } catch (SerializationException e) {
+            throw new DataException("Failed to deserialize " + typeName + ": 
", e);
+        }
+    }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema 
schema, Object value) {
+        return fromConnectData(topic, schema, value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, 
byte[] value) {
+        return toConnectData(topic, value);
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java
new file mode 100644
index 00000000000..2f7019d0069
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.config.ConfigDef;
+
+import java.util.Map;
+
+/**
+ * Configuration options for instances of {@link LongConverter}, {@link 
IntegerConverter}, {@link ShortConverter}, {@link DoubleConverter},
+ * and {@link FloatConverter} instances.
+ */
+public class NumberConverterConfig extends ConverterConfig {
+
+    private final static ConfigDef CONFIG = ConverterConfig.newConfigDef();
+
+    public static ConfigDef configDef() {
+        return CONFIG;
+    }
+
+    public NumberConverterConfig(Map<String, ?> props) {
+        super(CONFIG, props);
+    }
+}
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java
new file mode 100644
index 00000000000..9a769ffc1a5
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.ShortDeserializer;
+import org.apache.kafka.common.serialization.ShortSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * {@link Converter} and {@link HeaderConverter} implementation that only 
supports serializing to and deserializing from short values.
+ * It does support handling nulls. When converting from bytes to Kafka Connect 
format, the converter will always return an
+ * optional INT16 schema.
+ * <p>
+ * This implementation currently does nothing with the topic names or header 
names.
+ */
+public class ShortConverter extends NumberConverter<Short> {
+
+    public ShortConverter() {
+        super("short", Schema.OPTIONAL_INT16_SCHEMA, new ShortSerializer(), 
new ShortDeserializer());
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java
new file mode 100644
index 00000000000..17440839445
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+
+public class DoubleConverterTest extends NumberConverterTest<Double> {
+
+    public Double[] samples() {
+        return new Double[]{Double.MIN_VALUE, 1234.31, Double.MAX_VALUE};
+    }
+
+    @Override
+    protected Schema schema() {
+        return Schema.OPTIONAL_FLOAT64_SCHEMA;
+    }
+
+    @Override
+    protected NumberConverter<Double> createConverter() {
+        return new DoubleConverter();
+    }
+
+    @Override
+    protected Serializer<Double> createSerializer() {
+        return new DoubleSerializer();
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java
new file mode 100644
index 00000000000..57a18602d1e
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.FloatSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+
+public class FloatConverterTest extends NumberConverterTest<Float> {
+
+    public Float[] samples() {
+        return new Float[]{Float.MIN_VALUE, 1234.31f, Float.MAX_VALUE};
+    }
+
+    @Override
+    protected Schema schema() {
+        return Schema.OPTIONAL_FLOAT32_SCHEMA;
+    }
+
+    @Override
+    protected NumberConverter<Float> createConverter() {
+        return new FloatConverter();
+    }
+
+    @Override
+    protected Serializer<Float> createSerializer() {
+        return new FloatSerializer();
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java
new file mode 100644
index 00000000000..33fbe600be3
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+
+public class IntegerConverterTest extends NumberConverterTest<Integer> {
+
+    public Integer[] samples() {
+        return new Integer[]{Integer.MIN_VALUE, 1234, Integer.MAX_VALUE};
+    }
+
+    @Override
+    protected Schema schema() {
+        return Schema.OPTIONAL_INT32_SCHEMA;
+    }
+
+    @Override
+    protected NumberConverter<Integer> createConverter() {
+        return new IntegerConverter();
+    }
+
+    @Override
+    protected Serializer<Integer> createSerializer() {
+        return new IntegerSerializer();
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java
new file mode 100644
index 00000000000..8f41bb5511d
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+
+public class LongConverterTest extends NumberConverterTest<Long> {
+
+    public Long[] samples() {
+        return new Long[]{Long.MIN_VALUE, 1234L, Long.MAX_VALUE};
+    }
+
+    @Override
+    protected Schema schema() {
+        return Schema.OPTIONAL_INT64_SCHEMA;
+    }
+
+    @Override
+    protected NumberConverter<Long> createConverter() {
+        return new LongConverter();
+    }
+
+    @Override
+    protected Serializer<Long> createSerializer() {
+        return new LongSerializer();
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java
new file mode 100644
index 00000000000..2936a719789
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public abstract class NumberConverterTest<T extends Number> {
+    private static final String TOPIC = "topic";
+    private static final String HEADER_NAME = "header";
+
+    private T[] samples;
+    private Schema schema;
+    private NumberConverter<T> converter;
+    private Serializer<T> serializer;
+
+    protected abstract T[] samples();
+
+    protected abstract NumberConverter<T> createConverter();
+
+    protected abstract Serializer<T> createSerializer();
+
+    protected abstract Schema schema();
+
+    @Before
+    public void setup() {
+        converter = createConverter();
+        serializer = createSerializer();
+        schema = schema();
+        samples = samples();
+    }
+
+    @Test
+    public void testConvertingSamplesToAndFromBytes() throws 
UnsupportedOperationException {
+        for (T sample : samples) {
+            byte[] expected = serializer.serialize(TOPIC, sample);
+
+            // Data conversion
+            assertArrayEquals(expected, converter.fromConnectData(TOPIC, 
schema, sample));
+            SchemaAndValue data = converter.toConnectData(TOPIC, expected);
+            assertEquals(schema, data.schema());
+            assertEquals(sample, data.value());
+
+            // Header conversion
+            assertArrayEquals(expected, converter.fromConnectHeader(TOPIC, 
HEADER_NAME, schema, sample));
+            data = converter.toConnectHeader(TOPIC, HEADER_NAME, expected);
+            assertEquals(schema, data.schema());
+            assertEquals(sample, data.value());
+        }
+    }
+
+    @Test(expected = DataException.class)
+    public void testDeserializingDataWithTooManyBytes() {
+        converter.toConnectData(TOPIC, new byte[10]);
+    }
+
+    @Test(expected = DataException.class)
+    public void testDeserializingHeaderWithTooManyBytes() {
+        converter.toConnectHeader(TOPIC, HEADER_NAME, new byte[10]);
+    }
+
+    @Test(expected = DataException.class)
+    public void testSerializingIncorrectType() {
+        converter.fromConnectData(TOPIC, schema, "not a valid number");
+    }
+
+    @Test(expected = DataException.class)
+    public void testSerializingIncorrectHeader() {
+        converter.fromConnectHeader(TOPIC, HEADER_NAME, schema, "not a valid 
number");
+    }
+
+    @Test
+    public void testNullToBytes() {
+        assertEquals(null, converter.fromConnectData(TOPIC, schema, null));
+    }
+
+    @Test
+    public void testBytesNullToNumber() {
+        SchemaAndValue data = converter.toConnectData(TOPIC, null);
+        assertEquals(schema(), data.schema());
+        assertNull(data.value());
+    }
+}
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java
 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java
new file mode 100644
index 00000000000..871f39833de
--- /dev/null
+++ 
b/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.ShortSerializer;
+import org.apache.kafka.connect.data.Schema;
+
+public class ShortConverterTest extends NumberConverterTest<Short> {
+
+    public Short[] samples() {
+        return new Short[]{Short.MIN_VALUE, 123, Short.MAX_VALUE};
+    }
+
+    @Override
+    protected Schema schema() {
+        return Schema.OPTIONAL_INT16_SCHEMA;
+    }
+
+    @Override
+    protected NumberConverter<Short> createConverter() {
+        return new ShortConverter();
+    }
+
+    @Override
+    protected Serializer<Short> createSerializer() {
+        return new ShortSerializer();
+    }
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add primitive numeric converters to Connect
> -------------------------------------------
>
>                 Key: KAFKA-6913
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6913
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 2.0.0
>
>
> Kafka common includes serdes for long, int, short, float, and double types, 
> but Connect does not have converters for these. They should support null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to