This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ecd97bf45 [KafkaConnect] Fix RecordConverter for UUID and Fixed Types 
(#11346)
9ecd97bf45 is described below

commit 9ecd97bf4538ca94276fb019c2ec477d28e4bf7c
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Oct 25 13:39:48 2024 -0700

    [KafkaConnect] Fix RecordConverter for UUID and Fixed Types (#11346)
---
 .../iceberg/connect/data/RecordConverter.java      | 25 +++++++++++++----
 .../iceberg/connect/data/RecordConverterTest.java  | 32 ++++++++++++++++++----
 2 files changed, 47 insertions(+), 10 deletions(-)

diff --git 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
index a312e69001..1a57a64448 100644
--- 
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
+++ 
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
@@ -41,6 +41,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -60,7 +61,9 @@ import org.apache.iceberg.types.Types.MapType;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.ByteBuffers;
 import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 
@@ -130,8 +133,9 @@ class RecordConverter {
       case UUID:
         return convertUUID(value);
       case BINARY:
-      case FIXED:
         return convertBase64Binary(value);
+      case FIXED:
+        return ByteBuffers.toByteArray(convertBase64Binary(value));
       case DATE:
         return convertDateValue(value);
       case TIME:
@@ -390,13 +394,24 @@ class RecordConverter {
     throw new IllegalArgumentException("Cannot convert to string: " + 
value.getClass().getName());
   }
 
-  protected UUID convertUUID(Object value) {
+  protected Object convertUUID(Object value) {
+    UUID uuid;
     if (value instanceof String) {
-      return UUID.fromString((String) value);
+      uuid = UUID.fromString((String) value);
     } else if (value instanceof UUID) {
-      return (UUID) value;
+      uuid = (UUID) value;
+    } else {
+      throw new IllegalArgumentException("Cannot convert to UUID: " + 
value.getClass().getName());
+    }
+
+    if (FileFormat.PARQUET
+        .name()
+        .toLowerCase(Locale.ROOT)
+        .equals(config.writeProps().get(TableProperties.DEFAULT_FILE_FORMAT))) 
{
+      return UUIDUtil.convert(uuid);
+    } else {
+      return uuid;
     }
-    throw new IllegalArgumentException("Cannot convert to UUID: " + 
value.getClass().getName());
   }
 
   protected ByteBuffer convertBase64Binary(Object value) {
diff --git 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
index b494a9da85..47ee76eade 100644
--- 
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
+++ 
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
@@ -37,9 +37,11 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.connect.IcebergSinkConfig;
@@ -72,6 +74,7 @@ import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.types.Types.TimeType;
 import org.apache.iceberg.types.Types.TimestampType;
 import org.apache.iceberg.types.Types.UUIDType;
+import org.apache.iceberg.util.UUIDUtil;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaBuilder;
@@ -221,6 +224,25 @@ public class RecordConverterTest {
     assertRecordValues(record);
   }
 
+  @Test
+  public void testUUIDConversionWithParquet() {
+    Table table = mock(Table.class);
+    when(table.schema())
+        .thenReturn(new org.apache.iceberg.Schema(NestedField.required(1, 
"uuid", UUIDType.get())));
+    when(config.writeProps())
+        .thenReturn(
+            ImmutableMap.of(
+                TableProperties.DEFAULT_FILE_FORMAT,
+                FileFormat.PARQUET.name().toLowerCase(Locale.ROOT)));
+
+    RecordConverter converter = new RecordConverter(table, config);
+    Map<String, Object> data =
+        ImmutableMap.<String, Object>builder().put("uuid", 
UUID_VAL.toString()).build();
+
+    Record record = converter.convert(data);
+    assertThat(record.getField("uuid")).isEqualTo(UUIDUtil.convert(UUID_VAL));
+  }
+
   @Test
   public void testNestedMapConvert() {
     Table table = mock(Table.class);
@@ -859,7 +881,7 @@ public class RecordConverterTest {
     assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
   }
 
-  private Map<String, Object> createMapData() {
+  public static Map<String, Object> createMapData() {
     return ImmutableMap.<String, Object>builder()
         .put("i", 1)
         .put("l", 2L)
@@ -898,8 +920,8 @@ public class RecordConverterTest {
         .put("s", STR_VAL)
         .put("b", true)
         .put("u", UUID_VAL.toString())
-        .put("f", BYTES_VAL.array())
-        .put("bi", BYTES_VAL.array())
+        .put("f", BYTES_VAL)
+        .put("bi", BYTES_VAL)
         .put("li", LIST_VAL)
         .put("ma", MAP_VAL);
   }
@@ -921,11 +943,11 @@ public class RecordConverterTest {
     assertThat(rec.getField("dec")).isEqualTo(DEC_VAL);
     assertThat(rec.getField("s")).isEqualTo(STR_VAL);
     assertThat(rec.getField("b")).isEqualTo(true);
-    assertThat(rec.getField("u")).isEqualTo(UUID_VAL);
-    assertThat(rec.getField("f")).isEqualTo(BYTES_VAL);
+    assertThat(rec.getField("f")).isEqualTo(BYTES_VAL.array());
     assertThat(rec.getField("bi")).isEqualTo(BYTES_VAL);
     assertThat(rec.getField("li")).isEqualTo(LIST_VAL);
     assertThat(rec.getField("ma")).isEqualTo(MAP_VAL);
+    assertThat(rec.getField("u")).isEqualTo(UUID_VAL);
   }
 
   private void assertNestedRecordValues(Record record) {

Reply via email to