This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4927eff4d501c790f74effa65e2e83a0a9b0f294
Author: Christophe Bornet <[email protected]>
AuthorDate: Tue May 20 17:19:00 2025 +0200

    [fix][io] Fix kinesis avro bytes handling (#24316)
    
    (cherry picked from commit 54ade7e2b8f2c0f7946a0d467113577fad63c854)
---
 .../pulsar/io/kinesis/json/JsonConverter.java      |  7 +++--
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 33 +++++++++++-----------
 .../pulsar/io/kinesis/json/JsonConverterTests.java |  3 +-
 .../integration/io/sinks/KinesisSinkTester.java    | 12 ++++++--
 4 files changed, 33 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
index 22412c39575..a44d081cd2c 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalTime;
@@ -78,11 +79,13 @@ public class JsonConverter {
             case BOOLEAN:
                 return jsonNodeFactory.booleanNode((Boolean) value);
             case BYTES:
+                byte[] bytes = new byte[((ByteBuffer) value).remaining()];
+                ((ByteBuffer) value).get(bytes);
                 // Workaround for 
https://github.com/wnameless/json-flattener/issues/91
                 if (convertBytesToString) {
-                    return 
jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value));
+                    return 
jsonNodeFactory.textNode(Base64.getEncoder().encodeToString(bytes));
                 }
-                return jsonNodeFactory.binaryNode((byte[]) value);
+                return jsonNodeFactory.binaryNode(bytes);
             case FIXED:
                 // Workaround for 
https://github.com/wnameless/json-flattener/issues/91
                 if (convertBytesToString) {
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index 1eda566df04..55505af4b7b 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -30,7 +30,6 @@ import com.google.gson.Gson;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
-import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -279,6 +278,7 @@ public class UtilsTest {
         RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
         
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
         
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+        
udtSchemaBuilder.field("c").type(SchemaType.BYTES).optional().defaultValue(null);
         
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
         
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
         
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
@@ -293,8 +293,9 @@ public class UtilsTest {
                 .set("e", udtGenericSchema.newRecordBuilder()
                         .set("a", "a")
                         .set("b", true)
-                        .set("d", 1.0)
-                        .set("f", 1.0f)
+                        .set("c", 
ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8)))
+                        .set("d", 0.5)
+                        .set("f", 0.25f)
                         .set("i", 1)
                         .set("l", 10L)
                         .build())
@@ -303,7 +304,7 @@ public class UtilsTest {
         Map<String, String> properties = new HashMap<>();
         properties.put("prop-key", "prop-value");
 
-        Record<GenericObject> genericObjectRecord = new 
Record<GenericObject>() {
+        Record<GenericObject> genericObjectRecord = new Record<>() {
             @Override
             public Optional<String> getTopicName() {
                 return Optional.of("data-ks1.table1");
@@ -321,7 +322,8 @@ public class UtilsTest {
 
             @Override
             public GenericObject getValue() {
-                return valueGenericRecord;
+                // Ensure the record in encoded amd decoded correctly
+                return 
valueSchema.decode(valueSchema.encode(valueGenericRecord));
             }
 
             @Override
@@ -339,7 +341,7 @@ public class UtilsTest {
         String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, 
genericObjectRecord, false);
 
         assertEquals(json, 
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload\":{\"c\":\"1\","
-                + 
"\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,\"i\":1,\"l\":10}},"
+                + 
"\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":0.5,\"f\":0.25,\"i\":1,\"l\":10}},"
                 + 
"\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}");
     }
 
@@ -369,18 +371,15 @@ public class UtilsTest {
         valueSchemaBuilder.field("e", 
udtGenericSchema).type(schemaType).optional().defaultValue(null);
         GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
 
-        byte[] bytes = "10".getBytes(StandardCharsets.UTF_8);
         GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
                 .set("c", "1")
                 .set("d", 1)
                 .set("e", udtGenericSchema.newRecordBuilder()
                         .set("a", "a")
                         .set("b", true)
-                        // There's a bug in json-flattener that doesn't handle 
byte[] fields correctly.
-                        // But since we use AUTO_CONSUME, we won't get byte[] 
fields for JSON schema anyway.
-                        .set("c", schemaType == SchemaType.AVRO ? bytes : 
Base64.getEncoder().encodeToString(bytes))
-                        .set("d", 1.0)
-                        .set("f", 1.0f)
+                        .set("c", 
ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8)))
+                        .set("d", 0.5)
+                        .set("f", 0.25f)
                         .set("i", 1)
                         .set("l", 10L)
                         .build())
@@ -398,7 +397,7 @@ public class UtilsTest {
 
             @Override
             public Object getNativeObject() {
-                return keyValue;
+                return keyValueSchema.decode(keyValueSchema.encode(keyValue));
             }
         };
 
@@ -441,15 +440,15 @@ public class UtilsTest {
         String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, 
genericObjectRecord, false);
 
         assertEquals(json, 
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
-                + 
"\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":1.0,"
-                + 
"\"f\":1.0,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},"
+                + 
"\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":0.5,"
+                + 
"\"f\":0.25,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},"
                 + 
"\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}");
 
         json = Utils.serializeRecordToJsonExpandingValue(objectMapper, 
genericObjectRecord, true);
 
         assertEquals(json, 
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload.value.c\":\"1\","
                 + 
"\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,"
-                + 
"\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":1.0,\"payload.value.e.f\":1.0,"
+                + 
"\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":0.5,\"payload.value.e.f\":0.25,"
                 + 
"\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key.a\":\"1\",\"payload.key.b\":1,"
                 + 
"\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
     }
@@ -476,7 +475,7 @@ public class UtilsTest {
 
             @Override
             public Object getNativeObject() {
-                return keyValue;
+                return keyValueSchema.decode(keyValueSchema.encode(keyValue));
             }
         };
 
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
index c3bbaa06d01..c762b713de2 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
@@ -35,6 +35,7 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -71,7 +72,7 @@ public class JsonConverterTests {
         genericRecord.put("l", 1L);
         genericRecord.put("i", 1);
         genericRecord.put("b", true);
-        genericRecord.put("bb", "10".getBytes(StandardCharsets.UTF_8));
+        genericRecord.put("bb", 
ByteBuffer.wrap("10".getBytes(StandardCharsets.UTF_8)));
         genericRecord.put("d", 10.0);
         genericRecord.put("f", 10.0f);
         genericRecord.put("s", "toto");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
index 83cb0088cd7..e1e339f6a82 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/KinesisSinkTester.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectReader;
 import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -156,13 +157,15 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
                         "f2_" + i,
                         Arrays.asList(i, i +1),
                         new HashSet<>(Arrays.asList((long) i)),
-                        ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
+                        ImmutableMap.of("map1_k_" + i, "map1_kv_" + i),
+                        ("key_bytes_" + i).getBytes(StandardCharsets.UTF_8));
                 final SimplePojo valuePojo = new SimplePojo(
                         String.valueOf(i),
                         "v2_" + i,
                         Arrays.asList(i, i +1),
                         new HashSet<>(Arrays.asList((long) i)),
-                        ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
+                        ImmutableMap.of("map1_v_" + i, "map1_vv_" + i),
+                        ("value_bytes_" + i).getBytes(StandardCharsets.UTF_8));
                 producer.newMessage()
                         .value(new KeyValue<>(keyPojo, valuePojo))
                         .send();
@@ -222,8 +225,12 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
             JsonNode payload = READER.readTree(data).at("/payload");
             String i = payload.at("/value/field1").asText();
             assertEquals(payload.at("/value/field2").asText(), "v2_" + i);
+            assertEquals(payload.at("/value/bytes").asText(),
+                    Base64.getEncoder().encodeToString(("value_bytes_" + 
i).getBytes(StandardCharsets.UTF_8)));
             assertEquals(payload.at("/key/field1").asText(), "f1_" + i);
             assertEquals(payload.at("/key/field2").asText(), "f2_" + i);
+            assertEquals(payload.at("/key/bytes").asText(),
+                    Base64.getEncoder().encodeToString(("key_bytes_" + 
i).getBytes(StandardCharsets.UTF_8)));
             actualKvs.put(i, i);
         } else {
             actualKvs.put(partitionKey, data);
@@ -271,6 +278,7 @@ public class KinesisSinkTester extends 
SinkTester<LocalStackContainer> {
         private List<Integer> list1;
         private Set<Long> set1;
         private Map<String, String> map1;
+        private byte[] bytes;
     }
 
     @Override

Reply via email to