[ https://issues.apache.org/jira/browse/KAFKA-6913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494454#comment-16494454 ]
ASF GitHub Bot commented on KAFKA-6913: --------------------------------------- ewencp closed pull request #5034: KAFKA-6913: Add Connect converters and header converters for short, integer, long, float, and double (WIP) URL: https://github.com/apache/kafka/pull/5034 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java new file mode 100644 index 00000000000..04019a7a529 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/DoubleConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.connect.data.Schema; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from double values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return an + * optional FLOAT64 schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +public class DoubleConverter extends NumberConverter<Double> { + + public DoubleConverter() { + super("double", Schema.OPTIONAL_FLOAT64_SCHEMA, new DoubleSerializer(), new DoubleDeserializer()); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java new file mode 100644 index 00000000000..16bf0e0f93f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/FloatConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.connect.data.Schema; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from float values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return an + * optional FLOAT32 schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +public class FloatConverter extends NumberConverter<Float> { + + public FloatConverter() { + super("float", Schema.OPTIONAL_FLOAT32_SCHEMA, new FloatSerializer(), new FloatDeserializer()); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java new file mode 100644 index 00000000000..6f3c78a0a73 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/IntegerConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.connect.data.Schema; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from integer values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return an + * optional INT32 schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +public class IntegerConverter extends NumberConverter<Integer> { + + public IntegerConverter() { + super("integer", Schema.OPTIONAL_INT32_SCHEMA, new IntegerSerializer(), new IntegerDeserializer()); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java new file mode 100644 index 00000000000..600c3042502 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/LongConverter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.connect.data.Schema; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from long values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return an + * optional INT64 schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +public class LongConverter extends NumberConverter<Long> { + + public LongConverter() { + super("long", Schema.OPTIONAL_INT64_SCHEMA, new LongSerializer(), new LongDeserializer()); + } + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java new file mode 100644 index 00000000000..9180444c0c8 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from number values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return the specified + * schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +abstract class NumberConverter<T extends Number> implements Converter, HeaderConverter { + + private final Serializer<T> serializer; + private final Deserializer<T> deserializer; + private final String typeName; + private final Schema schema; + + /** + * Create the converter. + * + * @param typeName the displayable name of the type; may not be null + * @param schema the optional schema to be used for all deserialized forms; may not be null + * @param serializer the serializer; may not be null + * @param deserializer the deserializer; may not be null + */ + protected NumberConverter(String typeName, Schema schema, Serializer<T> serializer, Deserializer<T> deserializer) { + this.typeName = typeName; + this.schema = schema; + this.serializer = serializer; + this.deserializer = deserializer; + assert this.serializer != null; + assert this.deserializer != null; + assert this.typeName != null; + assert this.schema != null; + } + + @Override + public ConfigDef config() { + return NumberConverterConfig.configDef(); + } + + @Override + public void configure(Map<String, ?> configs) { + NumberConverterConfig conf = new NumberConverterConfig(configs); + boolean isKey = conf.type() == ConverterType.KEY; + serializer.configure(configs, isKey); + deserializer.configure(configs, isKey); + + } + + @Override + public void configure(Map<String, ?> configs, boolean isKey) { + Map<String, Object> conf = new HashMap<>(configs); + conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName()); + configure(conf); + } + + protected T cast(Object value) { + return (T) value; + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + try { + return serializer.serialize(topic, value == null ? null : cast(value)); + } catch (ClassCastException e) { + throw new DataException("Failed to serialize to " + typeName + " (was " + value.getClass() + "): ", e); + } catch (SerializationException e) { + throw new DataException("Failed to serialize to " + typeName + ": ", e); + } + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + try { + return new SchemaAndValue(schema, deserializer.deserialize(topic, value)); + } catch (SerializationException e) { + throw new DataException("Failed to deserialize " + typeName + ": ", e); + } + } + + @Override + public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) { + return fromConnectData(topic, schema, value); + } + + @Override + public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) { + return toConnectData(topic, value); + } + + @Override + public void close() throws IOException { + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java new file mode 100644 index 00000000000..2f7019d0069 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/NumberConverterConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Map; + +/** + * Configuration options for instances of {@link LongConverter}, {@link IntegerConverter}, {@link ShortConverter}, {@link DoubleConverter}, + * and {@link FloatConverter} instances. + */ +public class NumberConverterConfig extends ConverterConfig { + + private final static ConfigDef CONFIG = ConverterConfig.newConfigDef(); + + public static ConfigDef configDef() { + return CONFIG; + } + + public NumberConverterConfig(Map<String, ?> props) { + super(CONFIG, props); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java new file mode 100644 index 00000000000..9a769ffc1a5 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/ShortConverter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.ShortDeserializer; +import org.apache.kafka.common.serialization.ShortSerializer; +import org.apache.kafka.connect.data.Schema; + +/** + * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to and deserializing from short values. + * It does support handling nulls. When converting from bytes to Kafka Connect format, the converter will always return an + * optional INT16 schema. + * <p> + * This implementation currently does nothing with the topic names or header names. + */ +public class ShortConverter extends NumberConverter<Short> { + + public ShortConverter() { + super("short", Schema.OPTIONAL_INT16_SCHEMA, new ShortSerializer(), new ShortDeserializer()); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java new file mode 100644 index 00000000000..17440839445 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/DoubleConverterTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; + +public class DoubleConverterTest extends NumberConverterTest<Double> { + + public Double[] samples() { + return new Double[]{Double.MIN_VALUE, 1234.31, Double.MAX_VALUE}; + } + + @Override + protected Schema schema() { + return Schema.OPTIONAL_FLOAT64_SCHEMA; + } + + @Override + protected NumberConverter<Double> createConverter() { + return new DoubleConverter(); + } + + @Override + protected Serializer<Double> createSerializer() { + return new DoubleSerializer(); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java new file mode 100644 index 00000000000..57a18602d1e --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/FloatConverterTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; + +public class FloatConverterTest extends NumberConverterTest<Float> { + + public Float[] samples() { + return new Float[]{Float.MIN_VALUE, 1234.31f, Float.MAX_VALUE}; + } + + @Override + protected Schema schema() { + return Schema.OPTIONAL_FLOAT32_SCHEMA; + } + + @Override + protected NumberConverter<Float> createConverter() { + return new FloatConverter(); + } + + @Override + protected Serializer<Float> createSerializer() { + return new FloatSerializer(); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java new file mode 100644 index 00000000000..33fbe600be3 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/IntegerConverterTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; + +public class IntegerConverterTest extends NumberConverterTest<Integer> { + + public Integer[] samples() { + return new Integer[]{Integer.MIN_VALUE, 1234, Integer.MAX_VALUE}; + } + + @Override + protected Schema schema() { + return Schema.OPTIONAL_INT32_SCHEMA; + } + + @Override + protected NumberConverter<Integer> createConverter() { + return new IntegerConverter(); + } + + @Override + protected Serializer<Integer> createSerializer() { + return new IntegerSerializer(); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java new file mode 100644 index 00000000000..8f41bb5511d --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/LongConverterTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; + +public class LongConverterTest extends NumberConverterTest<Long> { + + public Long[] samples() { + return new Long[]{Long.MIN_VALUE, 1234L, Long.MAX_VALUE}; + } + + @Override + protected Schema schema() { + return Schema.OPTIONAL_INT64_SCHEMA; + } + + @Override + protected NumberConverter<Long> createConverter() { + return new LongConverter(); + } + + @Override + protected Serializer<Long> createSerializer() { + return new LongSerializer(); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java new file mode 100644 index 00000000000..2936a719789 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/NumberConverterTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.errors.DataException; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public abstract class NumberConverterTest<T extends Number> { + private static final String TOPIC = "topic"; + private static final String HEADER_NAME = "header"; + + private T[] samples; + private Schema schema; + private NumberConverter<T> converter; + private Serializer<T> serializer; + + protected abstract T[] samples(); + + protected abstract NumberConverter<T> createConverter(); + + protected abstract Serializer<T> createSerializer(); + + protected abstract Schema schema(); + + @Before + public void setup() { + converter = createConverter(); + serializer = createSerializer(); + schema = schema(); + samples = samples(); + } + + @Test + public void testConvertingSamplesToAndFromBytes() throws UnsupportedOperationException { + for (T sample : samples) { + byte[] expected = serializer.serialize(TOPIC, sample); + + // Data conversion + assertArrayEquals(expected, converter.fromConnectData(TOPIC, schema, sample)); + SchemaAndValue data = converter.toConnectData(TOPIC, expected); + assertEquals(schema, data.schema()); + assertEquals(sample, data.value()); + + // Header conversion + assertArrayEquals(expected, converter.fromConnectHeader(TOPIC, HEADER_NAME, schema, sample)); + data = converter.toConnectHeader(TOPIC, HEADER_NAME, expected); + assertEquals(schema, data.schema()); + assertEquals(sample, data.value()); + } + } + + @Test(expected = DataException.class) + public void testDeserializingDataWithTooManyBytes() { + converter.toConnectData(TOPIC, new byte[10]); + } + + @Test(expected = DataException.class) + public void testDeserializingHeaderWithTooManyBytes() { + converter.toConnectHeader(TOPIC, HEADER_NAME, new byte[10]); + } + + @Test(expected = DataException.class) + public void testSerializingIncorrectType() { + converter.fromConnectData(TOPIC, schema, "not a valid number"); + } + + @Test(expected = DataException.class) + public void testSerializingIncorrectHeader() { + converter.fromConnectHeader(TOPIC, HEADER_NAME, schema, "not a valid number"); + } + + @Test + public void testNullToBytes() { + assertEquals(null, converter.fromConnectData(TOPIC, schema, null)); + } + + @Test + public void testBytesNullToNumber() { + SchemaAndValue data = converter.toConnectData(TOPIC, null); + assertEquals(schema(), data.schema()); + assertNull(data.value()); + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java new file mode 100644 index 00000000000..871f39833de --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/ShortConverterTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.ShortSerializer; +import org.apache.kafka.connect.data.Schema; + +public class ShortConverterTest extends NumberConverterTest<Short> { + + public Short[] samples() { + return new Short[]{Short.MIN_VALUE, 123, Short.MAX_VALUE}; + } + + @Override + protected Schema schema() { + return Schema.OPTIONAL_INT16_SCHEMA; + } + + @Override + protected NumberConverter<Short> createConverter() { + return new ShortConverter(); + } + + @Override + protected Serializer<Short> createSerializer() { + return new ShortSerializer(); + } +} + ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add primitive numeric converters to Connect > ------------------------------------------- > > Key: KAFKA-6913 > URL: https://issues.apache.org/jira/browse/KAFKA-6913 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Affects Versions: 1.1.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Major > Labels: needs-kip > Fix For: 2.0.0 > > > Kafka common includes serdes for long, int, short, float, and double types, > but Connect does not have converters for these. They should support null. -- This message was sent by Atlassian JIRA (v7.6.3#76005)