This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4927eff4d501c790f74effa65e2e83a0a9b0f294 Author: Christophe Bornet <[email protected]> AuthorDate: Tue May 20 17:19:00 2025 +0200 [fix][io] Fix kinesis avro bytes handling (#24316) (cherry picked from commit 54ade7e2b8f2c0f7946a0d467113577fad63c854) --- .../pulsar/io/kinesis/json/JsonConverter.java | 7 +++-- .../org/apache/pulsar/io/kinesis/UtilsTest.java | 33 +++++++++++----------- .../pulsar/io/kinesis/json/JsonConverterTests.java | 3 +- .../integration/io/sinks/KinesisSinkTester.java | 12 ++++++-- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java index 22412c39575..a44d081cd2c 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; @@ -78,11 +79,13 @@ public class JsonConverter { case BOOLEAN: return jsonNodeFactory.booleanNode((Boolean) value); case BYTES: + byte[] bytes = new byte[((ByteBuffer) value).remaining()]; + ((ByteBuffer) value).get(bytes); // Workaround for https://github.com/wnameless/json-flattener/issues/91 if (convertBytesToString) { - return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value)); + return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString(bytes)); } - return jsonNodeFactory.binaryNode((byte[]) value); + return jsonNodeFactory.binaryNode(bytes); case FIXED: // Workaround for https://github.com/wnameless/json-flattener/issues/91 if (convertBytesToString) { diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java index 1eda566df04..55505af4b7b 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java @@ -30,7 +30,6 @@ import com.google.gson.Gson; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -279,6 +278,7 @@ public class UtilsTest { RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1"); udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null); udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null); + udtSchemaBuilder.field("c").type(SchemaType.BYTES).optional().defaultValue(null); udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null); udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null); udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null); @@ -293,8 +293,9 @@ public class UtilsTest { .set("e", udtGenericSchema.newRecordBuilder() .set("a", "a") .set("b", true) - .set("d", 1.0) - .set("f", 1.0f) + .set("c", ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8))) + .set("d", 0.5) + .set("f", 0.25f) .set("i", 1) .set("l", 10L) .build()) @@ -303,7 +304,7 @@ public class UtilsTest { Map<String, String> properties = new HashMap<>(); properties.put("prop-key", "prop-value"); - Record<GenericObject> genericObjectRecord = new Record<GenericObject>() { + Record<GenericObject> genericObjectRecord = new Record<>() { @Override public Optional<String> getTopicName() { return Optional.of("data-ks1.table1"); @@ -321,7 +322,8 @@ public class UtilsTest { @Override public GenericObject getValue() { - return valueGenericRecord; + // Ensure the record in encoded amd decoded correctly + return valueSchema.decode(valueSchema.encode(valueGenericRecord)); } @Override @@ -339,7 +341,7 @@ public class UtilsTest { String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false); assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload\":{\"c\":\"1\"," - + "\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}}," + + "\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":0.5,\"f\":0.25,\"i\":1,\"l\":10}}," + "\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}"); } @@ -369,18 +371,15 @@ public class UtilsTest { valueSchemaBuilder.field("e", udtGenericSchema).type(schemaType).optional().defaultValue(null); GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType)); - byte[] bytes = "10".getBytes(StandardCharsets.UTF_8); GenericRecord valueGenericRecord = valueSchema.newRecordBuilder() .set("c", "1") .set("d", 1) .set("e", udtGenericSchema.newRecordBuilder() .set("a", "a") .set("b", true) - // There's a bug in json-flattener that doesn't handle byte[] fields correctly. - // But since we use AUTO_CONSUME, we won't get byte[] fields for JSON schema anyway. - .set("c", schemaType == SchemaType.AVRO ? bytes : Base64.getEncoder().encodeToString(bytes)) - .set("d", 1.0) - .set("f", 1.0f) + .set("c", ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8))) + .set("d", 0.5) + .set("f", 0.25f) .set("i", 1) .set("l", 10L) .build()) @@ -398,7 +397,7 @@ public class UtilsTest { @Override public Object getNativeObject() { - return keyValue; + return keyValueSchema.decode(keyValueSchema.encode(keyValue)); } }; @@ -441,15 +440,15 @@ public class UtilsTest { String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false); assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\"," - + "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":1.0," - + "\"f\":1.0,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}}," + + "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":0.5," + + "\"f\":0.25,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}}," + "\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}"); json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true); assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload.value.c\":\"1\"," + "\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true," - + "\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":1.0,\"payload.value.e.f\":1.0," + + "\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":0.5,\"payload.value.e.f\":0.25," + "\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key.a\":\"1\",\"payload.key.b\":1," + "\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}"); } @@ -476,7 +475,7 @@ public class UtilsTest { @Override public Object getNativeObject() { - return keyValue; + return keyValueSchema.decode(keyValueSchema.encode(keyValue)); } }; diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java index c3bbaa06d01..c762b713de2 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java @@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Calendar; @@ -71,7 +72,7 @@ public class JsonConverterTests { genericRecord.put("l", 1L); genericRecord.put("i", 1); genericRecord.put("b", true); - genericRecord.put("bb", "10".getBytes(StandardCharsets.UTF_8)); + genericRecord.put("bb", ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8))); genericRecord.put("d", 10.0); genericRecord.put("f", 10.0f); genericRecord.put("s", "toto"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java index 83cb0088cd7..e1e339f6a82 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -156,13 +157,15 @@ public class KinesisSinkTester extends SinkTester<LocalStackContainer> { "f2_" + i, Arrays.asList(i, i +1), new HashSet<>(Arrays.asList((long) i)), - ImmutableMap.of("map1_k_" + i, "map1_kv_" + i)); + ImmutableMap.of("map1_k_" + i, "map1_kv_" + i), + ("key_bytes_" + i).getBytes(StandardCharsets.UTF_8)); final SimplePojo valuePojo = new SimplePojo( String.valueOf(i), "v2_" + i, Arrays.asList(i, i +1), new HashSet<>(Arrays.asList((long) i)), - ImmutableMap.of("map1_v_" + i, "map1_vv_" + i)); + ImmutableMap.of("map1_v_" + i, "map1_vv_" + i), + ("value_bytes_" + i).getBytes(StandardCharsets.UTF_8)); producer.newMessage() .value(new KeyValue<>(keyPojo, valuePojo)) .send(); @@ -222,8 +225,12 @@ public class KinesisSinkTester extends SinkTester<LocalStackContainer> { JsonNode payload = READER.readTree(data).at("/payload"); String i = payload.at("/value/field1").asText(); assertEquals(payload.at("/value/field2").asText(), "v2_" + i); + assertEquals(payload.at("/value/bytes").asText(), + Base64.getEncoder().encodeToString(("value_bytes_" + i).getBytes(StandardCharsets.UTF_8))); assertEquals(payload.at("/key/field1").asText(), "f1_" + i); assertEquals(payload.at("/key/field2").asText(), "f2_" + i); + assertEquals(payload.at("/key/bytes").asText(), + Base64.getEncoder().encodeToString(("key_bytes_" + i).getBytes(StandardCharsets.UTF_8))); actualKvs.put(i, i); } else { actualKvs.put(partitionKey, data); @@ -271,6 +278,7 @@ public class KinesisSinkTester extends SinkTester<LocalStackContainer> { private List<Integer> list1; private Set<Long> set1; private Map<String, String> map1; + private byte[] bytes; } @Override
