This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new a0d86a84 Support UUID type when iceberg to delta
a0d86a84 is described below
commit a0d86a8489eab74c3e23d08e5790cad488cd7c7a
Author: Daniel Tu <[email protected]>
AuthorDate: Sun Sep 22 22:23:34 2024 -0700
Support UUID type when iceberg to delta
---
.../apache/xtable/model/schema/InternalSchema.java | 2 +
.../apache/xtable/model/schema/InternalType.java | 1 +
.../apache/xtable/avro/AvroSchemaConverter.java | 14 +-
.../apache/xtable/delta/DeltaSchemaExtractor.java | 33 ++++-
.../xtable/hudi/HudiPartitionValuesExtractor.java | 1 +
.../xtable/iceberg/IcebergSchemaExtractor.java | 4 +-
.../test/java/org/apache/xtable/GenericTable.java | 16 +++
.../org/apache/xtable/ITConversionController.java | 148 ++++++++++++++++++++-
.../java/org/apache/xtable/TestIcebergTable.java | 19 ++-
.../xtable/avro/TestAvroSchemaConverter.java | 45 +++++++
.../xtable/delta/TestDeltaSchemaExtractor.java | 43 ++++++
.../xtable/iceberg/TestIcebergDataHelper.java | 35 ++++-
.../xtable/iceberg/TestIcebergSchemaExtractor.java | 5 +-
13 files changed, 342 insertions(+), 24 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
index 5e001c6c..20af37e0 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
@@ -78,6 +78,8 @@ public class InternalSchema {
MILLIS
}
+ public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";
+
/**
* Performs a level-order traversal of the schema and returns a list of all
fields. Use this
* method to get a list that includes nested fields. Use {@link
InternalSchema#getFields()} when
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
index c2f1a223..e1b1049d 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalType.java
@@ -38,6 +38,7 @@ public enum InternalType {
LIST,
MAP,
UNION,
+ UUID,
FIXED,
STRING,
BYTES,
diff --git
a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
index 346dcded..9f40d29e 100644
--- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
+++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
@@ -132,8 +132,13 @@ public class AvroSchemaConverter {
break;
}
if (schema.getType() == Schema.Type.FIXED) {
- metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE,
schema.getFixedSize());
- newDataType = InternalType.FIXED;
+ String xtableLogicalType =
schema.getProp(InternalSchema.XTABLE_LOGICAL_TYPE);
+ if ("uuid".equals(xtableLogicalType)) {
+ newDataType = InternalType.UUID;
+ } else {
+ metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE,
schema.getFixedSize());
+ newDataType = InternalType.FIXED;
+ }
} else {
newDataType = InternalType.BYTES;
}
@@ -435,6 +440,11 @@ public class AvroSchemaConverter {
Schema.createFixed(
internalSchema.getName(), internalSchema.getComment(), null,
fixedSize),
internalSchema);
+ case UUID:
+ Schema uuidSchema =
+ Schema.createFixed(internalSchema.getName(),
internalSchema.getComment(), null, 16);
+ uuidSchema.addProp(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
+ return finalizeSchema(uuidSchema, internalSchema);
default:
throw new UnsupportedSchemaTypeException(
"Encountered unhandled type during InternalSchema to Avro
conversion: "
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index fa425ef2..e312761f 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
@@ -73,7 +74,7 @@ public class DeltaSchemaExtractor {
field.getName(),
convertFieldType(field),
field.getSchema().isNullable(),
- Metadata.empty()))
+ getMetaData(field.getSchema().getDataType())))
.toArray(StructField[]::new);
return new StructType(fields);
}
@@ -90,6 +91,7 @@ public class DeltaSchemaExtractor {
return DataTypes.LongType;
case BYTES:
case FIXED:
+ case UUID:
return DataTypes.BinaryType;
case BOOLEAN:
return DataTypes.BooleanType;
@@ -142,12 +144,24 @@ public class DeltaSchemaExtractor {
}
}
+ private Metadata getMetaData(InternalType type) {
+ if (type == InternalType.UUID) {
+ return new
MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid").build();
+ } else {
+ return Metadata.empty();
+ }
+ }
+
public InternalSchema toInternalSchema(StructType structType) {
- return toInternalSchema(structType, null, false, null);
+ return toInternalSchema(structType, null, false, null, null);
}
private InternalSchema toInternalSchema(
- DataType dataType, String parentPath, boolean nullable, String comment) {
+ DataType dataType,
+ String parentPath,
+ boolean nullable,
+ String comment,
+ Metadata originalMetadata) {
Map<InternalSchema.MetadataKey, Object> metadata = null;
List<InternalField> fields = null;
InternalType type;
@@ -172,7 +186,12 @@ public class DeltaSchemaExtractor {
type = InternalType.DOUBLE;
break;
case "binary":
- type = InternalType.BYTES;
+ if (originalMetadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)
+ &&
"uuid".equals(originalMetadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE))) {
+ type = InternalType.UUID;
+ } else {
+ type = InternalType.BYTES;
+ }
break;
case "long":
type = InternalType.LONG;
@@ -210,7 +229,8 @@ public class DeltaSchemaExtractor {
field.dataType(),
SchemaUtils.getFullyQualifiedPath(parentPath,
field.name()),
field.nullable(),
- fieldComment);
+ fieldComment,
+ field.metadata());
return InternalField.builder()
.name(field.name())
.fieldId(fieldId)
@@ -238,6 +258,7 @@ public class DeltaSchemaExtractor {
SchemaUtils.getFullyQualifiedPath(
parentPath,
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
arrayType.containsNull(),
+ null,
null);
InternalField elementField =
InternalField.builder()
@@ -256,6 +277,7 @@ public class DeltaSchemaExtractor {
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
false,
+ null,
null);
InternalField keyField =
InternalField.builder()
@@ -269,6 +291,7 @@ public class DeltaSchemaExtractor {
SchemaUtils.getFullyQualifiedPath(
parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
mapType.valueContainsNull(),
+ null,
null);
InternalField valueField =
InternalField.builder()
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
index 0b95e65e..a55968d9 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPartitionValuesExtractor.java
@@ -159,6 +159,7 @@ public class HudiPartitionValuesExtractor {
break;
case FIXED:
case BYTES:
+ case UUID:
parsedValue = valueAsString.getBytes(StandardCharsets.UTF_8);
break;
case BOOLEAN:
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 3acd7856..4cf825d7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -199,6 +199,8 @@ public class IcebergSchemaExtractor {
return Types.DecimalType.of(precision, scale);
case RECORD:
return Types.StructType.of(convertFields(field.getSchema(),
fieldIdTracker));
+ case UUID:
+ return Types.UUIDType.get();
case MAP:
InternalField key =
field.getSchema().getFields().stream()
@@ -305,7 +307,7 @@ public class IcebergSchemaExtractor {
InternalSchema.MetadataKey.FIXED_BYTES_SIZE,
fixedType.length());
break;
case UUID:
- type = InternalType.FIXED;
+ type = InternalType.UUID;
metadata =
Collections.singletonMap(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 16);
break;
case STRUCT:
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index c6b75c31..dce0f21a 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -127,6 +127,22 @@ public interface GenericTable<T, Q> extends AutoCloseable {
}
}
+ static GenericTable getInstanceWithUUIDColumns(
+ String tableName,
+ Path tempDir,
+ SparkSession sparkSession,
+ JavaSparkContext jsc,
+ String sourceFormat,
+ boolean isPartitioned) {
+ switch (sourceFormat) {
+ case ICEBERG:
+ return TestIcebergTable.forSchemaWithUUIDColumns(
+ tableName, isPartitioned ? "level" : null, tempDir,
jsc.hadoopConfiguration());
+ default:
+ throw new IllegalArgumentException("Unsupported source format: " +
sourceFormat);
+ }
+ }
+
static String getTableName() {
return "test_table_" + UUID.randomUUID().toString().replaceAll("-", "_");
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 58f0f982..3d539766 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import java.net.URI;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -38,12 +39,14 @@ import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -82,6 +85,10 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.spark.sql.delta.DeltaLog;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import org.apache.xtable.conversion.ConversionConfig;
@@ -100,6 +107,7 @@ public class ITConversionController {
@TempDir public static Path tempDir;
private static final DateTimeFormatter DATE_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static JavaSparkContext jsc;
private static SparkSession sparkSession;
@@ -142,6 +150,19 @@ public class ITConversionController {
return arguments.stream();
}
+ private static Stream<Arguments> generateTestParametersForUUID() {
+ List<Arguments> arguments = new ArrayList<>();
+ for (SyncMode syncMode : SyncMode.values()) {
+ for (boolean isPartitioned : new boolean[] {true, false}) {
+ // TODO: Add Hudi UUID support later
(https://github.com/apache/incubator-xtable/issues/543)
+ // Current spark parquet reader can not handle fix-size byte array
with UUID logic type
+ List<String> targetTableFormats = Arrays.asList(DELTA);
+ arguments.add(Arguments.of(ICEBERG, targetTableFormats, syncMode,
isPartitioned));
+ }
+ }
+ return arguments.stream();
+ }
+
private static Stream<Arguments> testCasesWithSyncModes() {
return Stream.of(Arguments.of(SyncMode.INCREMENTAL),
Arguments.of(SyncMode.FULL));
}
@@ -261,6 +282,54 @@ public class ITConversionController {
}
}
+ // The test content is the simplified version of testVariousOperations
+ // The difference is that the data source from Iceberg contains UUID columns
+ @ParameterizedTest
+ @MethodSource("generateTestParametersForUUID")
+ public void testVariousOperationsWithUUID(
+ String sourceTableFormat,
+ List<String> targetTableFormats,
+ SyncMode syncMode,
+ boolean isPartitioned) {
+ String tableName = getTableName();
+ ConversionController conversionController = new
ConversionController(jsc.hadoopConfiguration());
+ String partitionConfig = null;
+ if (isPartitioned) {
+ partitionConfig = "level:VALUE";
+ }
+ ConversionSourceProvider<?> conversionSourceProvider =
+ getConversionSourceProvider(sourceTableFormat);
+ List<?> insertRecords;
+ try (GenericTable table =
+ GenericTable.getInstanceWithUUIDColumns(
+ tableName, tempDir, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
+ insertRecords = table.insertRows(100);
+
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ syncMode,
+ tableName,
+ table,
+ targetTableFormats,
+ partitionConfig,
+ null);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
100);
+
+ // Upsert some records and sync again
+ table.upsertRows(insertRecords.subList(0, 20));
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
100);
+
+ table.deleteRows(insertRecords.subList(30, 50));
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
80);
+ checkDatasetEquivalenceWithFilter(
+ sourceTableFormat, table, targetTableFormats,
table.getFilterQuery());
+ }
+ }
+
@ParameterizedTest
@MethodSource("testCasesWithPartitioningAndSyncModes")
public void testConcurrentInsertWritesInSource(
@@ -797,13 +866,84 @@ public class ITConversionController {
// if count is not known ahead of time, ensure datasets are
non-empty
assertFalse(dataset1Rows.isEmpty());
}
+
+ if (containsUUIDFields(dataset1Rows) &&
containsUUIDFields(dataset2Rows)) {
+ compareDatasetWithUUID(dataset1Rows, dataset2Rows);
+ } else {
+ assertEquals(
+ dataset1Rows,
+ dataset2Rows,
+ String.format(
+ "Datasets are not equivalent when reading from Spark.
Source: %s, Target: %s",
+ sourceFormat, format));
+ }
+ });
+ }
+
+ /**
+ * Compares two datasets where dataset1Rows is for Iceberg and dataset2Rows
is for other formats
+ * (such as Delta or Hudi). - For the "uuid_field", if present, the UUID
from dataset1 (Iceberg)
+ * is compared with the Base64-encoded UUID from dataset2 (other formats),
after decoding. - For
+ * all other fields, the values are compared directly. - If neither row
contains the "uuid_field",
+ * the rows are compared as plain JSON strings.
+ *
+ * @param dataset1Rows List of JSON rows representing the dataset in Iceberg
format (UUID is
+ * stored as a string).
+ * @param dataset2Rows List of JSON rows representing the dataset in other
formats (UUID might be
+ * Base64-encoded).
+ */
+ private void compareDatasetWithUUID(List<String> dataset1Rows, List<String>
dataset2Rows) {
+ for (int i = 0; i < dataset1Rows.size(); i++) {
+ String row1 = dataset1Rows.get(i);
+ String row2 = dataset2Rows.get(i);
+ if (row1.contains("uuid_field") && row2.contains("uuid_field")) {
+ try {
+ JsonNode node1 = OBJECT_MAPPER.readTree(row1);
+ JsonNode node2 = OBJECT_MAPPER.readTree(row2);
+
+ // check uuid field
+ String uuidStr1 = node1.get("uuid_field").asText();
+ byte[] bytes =
Base64.getDecoder().decode(node2.get("uuid_field").asText());
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+ UUID uuid2 = new UUID(bb.getLong(), bb.getLong());
+ String uuidStr2 = uuid2.toString();
assertEquals(
- dataset1Rows,
- dataset2Rows,
+ uuidStr1,
+ uuidStr2,
String.format(
"Datasets are not equivalent when reading from Spark.
Source: %s, Target: %s",
- sourceFormat, format));
- });
+ uuidStr1, uuidStr2));
+
+ // check other fields
+ ((ObjectNode) node1).remove("uuid_field");
+ ((ObjectNode) node2).remove("uuid_field");
+ assertEquals(
+ node1.toString(),
+ node2.toString(),
+ String.format(
+ "Datasets are not equivalent when comparing other fields.
Source: %s, Target: %s",
+ node1, node2));
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ assertEquals(
+ row1,
+ row2,
+ String.format(
+ "Datasets are not equivalent when reading from Spark. Source:
%s, Target: %s",
+ row1, row2));
+ }
+ }
+ }
+
+ private boolean containsUUIDFields(List<String> rows) {
+ for (String row : rows) {
+ if (row.contains("\"uuid_field\"")) {
+ return true;
+ }
+ }
+ return false;
}
private static Stream<Arguments> addBasicPartitionCases(Stream<Arguments>
arguments) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index bb63667a..0c8336fe 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -88,7 +88,7 @@ public class TestIcebergTable implements GenericTable<Record,
String> {
hadoopConf,
DEFAULT_RECORD_KEY_FIELD,
Collections.singletonList(partitionField),
- false);
+ TestIcebergDataHelper.SchemaType.COMMON);
}
public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning(
@@ -99,7 +99,18 @@ public class TestIcebergTable implements
GenericTable<Record, String> {
hadoopConf,
DEFAULT_RECORD_KEY_FIELD,
Collections.singletonList(partitionField),
- true);
+ TestIcebergDataHelper.SchemaType.COMMON_WITH_ADDITIONAL_COLUMNS);
+ }
+
+ public static TestIcebergTable forSchemaWithUUIDColumns(
+ String tableName, String partitionField, Path tempDir, Configuration
hadoopConf) {
+ return new TestIcebergTable(
+ tableName,
+ tempDir,
+ hadoopConf,
+ DEFAULT_RECORD_KEY_FIELD,
+ Collections.singletonList(partitionField),
+ TestIcebergDataHelper.SchemaType.COMMON_WITH_UUID_COLUMN);
}
public TestIcebergTable(
@@ -108,12 +119,12 @@ public class TestIcebergTable implements
GenericTable<Record, String> {
Configuration hadoopConf,
String recordKeyField,
List<String> partitionFields,
- boolean includeAdditionalColumns) {
+ TestIcebergDataHelper.SchemaType schemaType) {
this.tableName = tableName;
this.basePath = tempDir.toUri().toString();
this.icebergDataHelper =
TestIcebergDataHelper.createIcebergDataHelper(
- recordKeyField, filterNullFields(partitionFields),
includeAdditionalColumns);
+ recordKeyField, filterNullFields(partitionFields), schemaType);
this.schema = icebergDataHelper.getTableSchema();
PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
index 20d41139..0b6823a1 100644
---
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
+++
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
@@ -844,4 +844,49 @@ public class TestAvroSchemaConverter {
.build();
assertEquals(internalSchema,
AvroSchemaConverter.getInstance().toInternalSchema(schemaWithIds));
}
+
+ @Test
+ public void testIcebergToAvroUUIDSupport() {
+ String schemaName = "testRecord";
+ String doc = "What's up doc";
+ Schema avroRepresentation =
+ new Schema.Parser()
+ .parse(
+ "{\"type\":\"record\",\"name\":\"testRecord\",\"doc\":\"What's
up doc\",\"fields\":["
+ +
"{\"name\":\"requiredUUID\",\"type\":{\"type\":\"fixed\",\"name\":\"required_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}},"
+ +
"{\"name\":\"optionalUUID\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"optional_uuid\",\"size\":16,\"xtableLogicalType\":\"uuid\"}],\"default\":null}"
+ + "]}");
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name(schemaName)
+ .comment(doc)
+ .dataType(InternalType.RECORD)
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("requiredUUID")
+ .schema(
+ InternalSchema.builder()
+ .name("required_uuid")
+ .dataType(InternalType.UUID)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("optionalUUID")
+ .schema(
+ InternalSchema.builder()
+ .name("optional_uuid")
+ .dataType(InternalType.UUID)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build()))
+ .build();
+ assertEquals(
+ avroRepresentation,
AvroSchemaConverter.getInstance().fromInternalSchema(internalSchema));
+ assertEquals(
+ internalSchema,
AvroSchemaConverter.getInstance().toInternalSchema(avroRepresentation));
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 45c90660..4b0eacd0 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -892,4 +893,46 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
+
+ @Test
+ public void testIcebergToDeltaUUIDSupport() {
+ Metadata metadata =
+ new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE,
"uuid").build();
+ StructType structRepresentation =
+ new StructType()
+ .add("requiredUUID", DataTypes.BinaryType, false, metadata)
+ .add("optionalUUID", DataTypes.BinaryType, true, metadata);
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("struct")
+ .dataType(InternalType.RECORD)
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("requiredUUID")
+ .schema(
+ InternalSchema.builder()
+ .name("binary")
+ .dataType(InternalType.UUID)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("optionalUUID")
+ .schema(
+ InternalSchema.builder()
+ .name("binary")
+ .dataType(InternalType.UUID)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build()))
+ .build();
+ Assertions.assertEquals(
+ structRepresentation,
+ DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
+ Assertions.assertEquals(
+ internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index cf806fd6..1d10fe7a 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -104,12 +104,16 @@ public class TestIcebergDataHelper {
Arrays.asList(
NestedField.optional(31, "additional_column1",
Types.StringType.get()),
NestedField.optional(32, "additional_column2",
Types.LongType.get()));
+ private static final List<Types.NestedField> UUID_FIELDS =
+ Arrays.asList(NestedField.optional(33, "uuid_field",
Types.UUIDType.get()));
private static final Schema BASE_SCHEMA = new Schema(COMMON_FIELDS);
private static final Schema SCHEMA_WITH_ADDITIONAL_COLUMNS =
new Schema(
Stream.concat(COMMON_FIELDS.stream(), ADDITIONAL_FIELDS.stream())
.collect(Collectors.toList()));
-
+ private static final Schema SCHEMA_WITH_UUID_COLUMN =
+ new Schema(
+ Stream.concat(COMMON_FIELDS.stream(),
UUID_FIELDS.stream()).collect(Collectors.toList()));
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
@@ -117,9 +121,15 @@ public class TestIcebergDataHelper {
String recordKeyField;
List<String> partitionFieldNames;
+ public static enum SchemaType {
+ COMMON,
+ COMMON_WITH_ADDITIONAL_COLUMNS,
+ COMMON_WITH_UUID_COLUMN,
+ }
+
public static TestIcebergDataHelper createIcebergDataHelper(
- String recordKeyField, List<String> partitionFields, boolean
includeAdditionalColumns) {
- Schema tableSchema = getSchema(includeAdditionalColumns);
+ String recordKeyField, List<String> partitionFields, SchemaType
schemaType) {
+ Schema tableSchema = getSchema(schemaType);
return TestIcebergDataHelper.builder()
.tableSchema(tableSchema)
.recordKeyField(recordKeyField)
@@ -127,8 +137,17 @@ public class TestIcebergDataHelper {
.build();
}
- private static Schema getSchema(boolean includeAdditionalColumns) {
- return includeAdditionalColumns ? SCHEMA_WITH_ADDITIONAL_COLUMNS :
BASE_SCHEMA;
+ private static Schema getSchema(SchemaType schemaType) {
+ switch (schemaType) {
+ case COMMON:
+ return BASE_SCHEMA;
+ case COMMON_WITH_ADDITIONAL_COLUMNS:
+ return SCHEMA_WITH_ADDITIONAL_COLUMNS;
+ case COMMON_WITH_UUID_COLUMN:
+ return SCHEMA_WITH_UUID_COLUMN;
+ default:
+ throw new IllegalArgumentException("Unknown schema type: " +
schemaType);
+ }
}
public List<Record> generateInsertRecords(int numRecords) {
@@ -299,7 +318,11 @@ public class TestIcebergDataHelper {
case STRUCT:
return generateInsertRecord(timeLowerBound, timeUpperBound,
fieldType.asStructType());
case UUID:
- return UUID.randomUUID().toString();
+ UUID uuid = UUID.randomUUID();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[16]);
+ byteBuffer.putLong(uuid.getMostSignificantBits());
+ byteBuffer.putLong(uuid.getLeastSignificantBits());
+ return byteBuffer.array();
case LIST:
Types.ListType listType = (Types.ListType) fieldType;
int listSize = RANDOM.nextInt(5) + 1;
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 7559a5e8..28776541 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -372,7 +372,7 @@ public class TestIcebergSchemaExtractor {
.schema(
InternalSchema.builder()
.name("uuid")
- .dataType(InternalType.FIXED)
+ .dataType(InternalType.UUID)
.isNullable(false)
.metadata(fixedMetadata)
.build())
@@ -383,7 +383,7 @@ public class TestIcebergSchemaExtractor {
.schema(
InternalSchema.builder()
.name("uuid")
- .dataType(InternalType.FIXED)
+ .dataType(InternalType.UUID)
.isNullable(true)
.metadata(fixedMetadata)
.build())
@@ -391,6 +391,7 @@ public class TestIcebergSchemaExtractor {
.build()))
.build();
assertEquals(expectedSchema, (SCHEMA_EXTRACTOR.fromIceberg(inputSchema)));
+
assertTrue(inputSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(expectedSchema)));
}
@Test