This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/main by this push: new 9323f1e fix #1332 : CameHeader value is wrongly interpreted as BigDecimal and causes ClassCastException. 9323f1e is described below commit 9323f1ece25dd23d557c6b3b12b9c193a6e46f07 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Jan 31 13:52:51 2022 +0100 fix #1332 : CameHeader value is wrongly interpreted as BigDecimal and causes ClassCastException. --- .../org/apache/camel/kafkaconnector/CamelSinkTask.java | 4 +++- .../org/apache/camel/kafkaconnector/CamelSourceTask.java | 6 +++--- .../org/apache/camel/kafkaconnector/CamelSinkTaskTest.java | 14 ++++++++++++-- .../apache/camel/kafkaconnector/CamelSourceTaskTest.java | 2 +- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index f5d942e..a53f298 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -240,7 +240,9 @@ public class CamelSinkTask extends SinkTask { final String key = StringHelper.after(header.key(), prefix, header.key()); final Schema schema = header.schema(); - if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) { + if (schema.type().equals(Schema.BYTES_SCHEMA.type()) + && Objects.equals(schema.name(), Decimal.LOGICAL_NAME) + && header.value() instanceof byte[]) { destination.put(key, Decimal.toLogical(schema, (byte[]) header.value())); } else { destination.put(key, header.value()); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 05a0d96..8b993a2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -36,7 +36,6 @@ import org.apache.camel.kafkaconnector.utils.SchemaHelper; import org.apache.camel.kafkaconnector.utils.TaskHelper; import org.apache.camel.support.UnitOfWorkHelper; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -357,8 +356,9 @@ public class CamelSourceTask extends SourceTask { } else if (value instanceof Date) { record.headers().addTimestamp(keyCamelHeader, (Date)value); } else if (value instanceof BigDecimal) { - Schema schema = Decimal.schema(((BigDecimal)value).scale()); - record.headers().add(keyCamelHeader, Decimal.fromLogical(schema, (BigDecimal)value), schema); + //XXX: kafka connect configured header converter takes care of the encoding, + //default: org.apache.kafka.connect.storage.SimpleHeaderConverter + record.headers().addDecimal(keyCamelHeader, (BigDecimal)value); } else if (value instanceof Double) { record.headers().addDouble(keyCamelHeader, (double)value); } else if (value instanceof Float) { diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index bab0a5d..c08a4f7 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -118,7 +119,13 @@ public class CamelSinkTaskTest { int myInteger = 100; Long myLong = new Long("100"); BigDecimal myBigDecimal = new BigDecimal(1234567890); - Schema schema = Decimal.schema(myBigDecimal.scale()); + Schema myBigDecimalSchema = Decimal.schema(myBigDecimal.scale()); + //reproducing bigDecimal encoding by kafka connect + BigDecimal kafkaBigDecimal = new BigDecimal("6.9203120E+787"); + Schema kafkaBigDecimalSchema = Decimal.schema(kafkaBigDecimal.scale()); + SimpleHeaderConverter shc = new SimpleHeaderConverter(); + byte[] persistedBytes = shc.fromConnectHeader("", "MyBigDecimal", kafkaBigDecimalSchema, kafkaBigDecimal); + SchemaAndValue sav = shc.toConnectHeader("", "MyBigDecimal", persistedBytes); List<SinkRecord> records = new ArrayList<SinkRecord>(); SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); @@ -129,7 +136,9 @@ public class CamelSinkTaskTest { record.headers().addDouble(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyDouble", myDouble); record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger); record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong); - record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema); + record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(myBigDecimalSchema, myBigDecimal), myBigDecimalSchema); + record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "KafkaBigDecimal", sav.value(), sav.schema()); + records.add(record); sinkTask.put(records); @@ -145,6 +154,7 @@ public class CamelSinkTaskTest { assertEquals(myInteger, exchange.getIn().getHeader("MyInteger")); assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class)); assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class)); + assertEquals(kafkaBigDecimal, exchange.getIn().getHeader("KafkaBigDecimal", BigDecimal.class)); sinkTask.stop(); } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java index 36ae9e2..8542a4f 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java @@ -312,7 +312,7 @@ public class CamelSourceTaskTest { List<SourceRecord> results = sourceTask.poll(); assertEquals(1, results.size()); Header bigDecimalHeader = results.get(0).headers().allWithName(CamelSourceTask.HEADER_CAMEL_PREFIX + "bigdecimal").next(); - assertEquals("[B", bigDecimalHeader.value().getClass().getName()); + assertTrue(bigDecimalHeader.value() instanceof BigDecimal); assertEquals(Decimal.class.getName(), bigDecimalHeader.schema().name()); assertEquals(Schema.Type.BYTES, bigDecimalHeader.schema().type());