This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9ecd97bf45 [KafkaConnect] Fix RecordConverter for UUID and Fixed Types
(#11346)
9ecd97bf45 is described below
commit 9ecd97bf4538ca94276fb019c2ec477d28e4bf7c
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Oct 25 13:39:48 2024 -0700
[KafkaConnect] Fix RecordConverter for UUID and Fixed Types (#11346)
---
.../iceberg/connect/data/RecordConverter.java | 25 +++++++++++++----
.../iceberg/connect/data/RecordConverterTest.java | 32 ++++++++++++++++++----
2 files changed, 47 insertions(+), 10 deletions(-)
diff --git
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
index a312e69001..1a57a64448 100644
---
a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
+++
b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java
@@ -41,6 +41,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
@@ -60,7 +61,9 @@ import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.UUIDUtil;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
@@ -130,8 +133,9 @@ class RecordConverter {
case UUID:
return convertUUID(value);
case BINARY:
- case FIXED:
return convertBase64Binary(value);
+ case FIXED:
+ return ByteBuffers.toByteArray(convertBase64Binary(value));
case DATE:
return convertDateValue(value);
case TIME:
@@ -390,13 +394,24 @@ class RecordConverter {
throw new IllegalArgumentException("Cannot convert to string: " +
value.getClass().getName());
}
- protected UUID convertUUID(Object value) {
+ protected Object convertUUID(Object value) {
+ UUID uuid;
if (value instanceof String) {
- return UUID.fromString((String) value);
+ uuid = UUID.fromString((String) value);
} else if (value instanceof UUID) {
- return (UUID) value;
+ uuid = (UUID) value;
+ } else {
+ throw new IllegalArgumentException("Cannot convert to UUID: " +
value.getClass().getName());
+ }
+
+ if (FileFormat.PARQUET
+ .name()
+ .toLowerCase(Locale.ROOT)
+ .equals(config.writeProps().get(TableProperties.DEFAULT_FILE_FORMAT)))
{
+ return UUIDUtil.convert(uuid);
+ } else {
+ return uuid;
}
- throw new IllegalArgumentException("Cannot convert to UUID: " +
value.getClass().getName());
}
protected ByteBuffer convertBase64Binary(Object value) {
diff --git
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
index b494a9da85..47ee76eade 100644
---
a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
+++
b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/RecordConverterTest.java
@@ -37,9 +37,11 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.connect.IcebergSinkConfig;
@@ -72,6 +74,7 @@ import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.TimeType;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.types.Types.UUIDType;
+import org.apache.iceberg.util.UUIDUtil;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
@@ -221,6 +224,25 @@ public class RecordConverterTest {
assertRecordValues(record);
}
+ @Test
+ public void testUUIDConversionWithParquet() {
+ Table table = mock(Table.class);
+ when(table.schema())
+ .thenReturn(new org.apache.iceberg.Schema(NestedField.required(1,
"uuid", UUIDType.get())));
+ when(config.writeProps())
+ .thenReturn(
+ ImmutableMap.of(
+ TableProperties.DEFAULT_FILE_FORMAT,
+ FileFormat.PARQUET.name().toLowerCase(Locale.ROOT)));
+
+ RecordConverter converter = new RecordConverter(table, config);
+ Map<String, Object> data =
+ ImmutableMap.<String, Object>builder().put("uuid",
UUID_VAL.toString()).build();
+
+ Record record = converter.convert(data);
+ assertThat(record.getField("uuid")).isEqualTo(UUIDUtil.convert(UUID_VAL));
+ }
+
@Test
public void testNestedMapConvert() {
Table table = mock(Table.class);
@@ -859,7 +881,7 @@ public class RecordConverterTest {
assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
}
- private Map<String, Object> createMapData() {
+ public static Map<String, Object> createMapData() {
return ImmutableMap.<String, Object>builder()
.put("i", 1)
.put("l", 2L)
@@ -898,8 +920,8 @@ public class RecordConverterTest {
.put("s", STR_VAL)
.put("b", true)
.put("u", UUID_VAL.toString())
- .put("f", BYTES_VAL.array())
- .put("bi", BYTES_VAL.array())
+ .put("f", BYTES_VAL)
+ .put("bi", BYTES_VAL)
.put("li", LIST_VAL)
.put("ma", MAP_VAL);
}
@@ -921,11 +943,11 @@ public class RecordConverterTest {
assertThat(rec.getField("dec")).isEqualTo(DEC_VAL);
assertThat(rec.getField("s")).isEqualTo(STR_VAL);
assertThat(rec.getField("b")).isEqualTo(true);
- assertThat(rec.getField("u")).isEqualTo(UUID_VAL);
- assertThat(rec.getField("f")).isEqualTo(BYTES_VAL);
+ assertThat(rec.getField("f")).isEqualTo(BYTES_VAL.array());
assertThat(rec.getField("bi")).isEqualTo(BYTES_VAL);
assertThat(rec.getField("li")).isEqualTo(LIST_VAL);
assertThat(rec.getField("ma")).isEqualTo(MAP_VAL);
+ assertThat(rec.getField("u")).isEqualTo(UUID_VAL);
}
private void assertNestedRecordValues(Record record) {