This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new dafe24a8 Add support for bucket partition transform, ignore void
partition in iceberg source
dafe24a8 is described below
commit dafe24a86c38bf32ce93464d14ce66c0029ca724
Author: Timothy Brown <[email protected]>
AuthorDate: Mon Mar 31 20:11:38 2025 -0500
Add support for bucket partition transform, ignore void partition in
iceberg source
---
.../model/schema/InternalPartitionField.java | 4 +
.../model/schema/PartitionTransformType.java | 3 +-
.../xtable/delta/DeltaPartitionExtractor.java | 23 +++-
.../org/apache/xtable/hudi/HudiTableManager.java | 8 ++
.../iceberg/IcebergPartitionSpecExtractor.java | 36 +++++-
.../iceberg/IcebergPartitionValueConverter.java | 13 ++-
.../org/apache/xtable/ITConversionController.java | 29 +++++
.../java/org/apache/xtable/TestIcebergTable.java | 12 +-
.../xtable/delta/TestDeltaPartitionExtractor.java | 37 ++++++
.../xtable/iceberg/TestIcebergDataHelper.java | 9 +-
.../iceberg/TestIcebergPartitionSpecExtractor.java | 126 +++++++++++++++------
.../TestIcebergPartitionValueConverter.java | 25 ++++
12 files changed, 271 insertions(+), 54 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
index 7c1757a5..99d3daa7 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.schema;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import lombok.Builder;
import lombok.Value;
@@ -32,6 +33,7 @@ import lombok.Value;
@Value
@Builder
public class InternalPartitionField {
+ public static final String NUM_BUCKETS = "NUM_BUCKETS";
// Source field the partition is based on
InternalField sourceField;
/*
@@ -47,4 +49,6 @@ public class InternalPartitionField {
@Builder.Default List<String> partitionFieldNames = Collections.emptyList();
// An enum describing how the source data was transformed into the partition
value
PartitionTransformType transformType;
+ // Transform options such as number of buckets in the BUCKET transform type
+ @Builder.Default Map<String, Object> transformOptions =
Collections.emptyMap();
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
index d6e082f1..8af9cea8 100644
---
a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
@@ -30,7 +30,8 @@ public enum PartitionTransformType {
MONTH,
DAY,
HOUR,
- VALUE;
+ VALUE,
+ BUCKET;
public boolean isTimeBased() {
return this == YEAR || this == MONTH || this == DAY || this == HOUR;
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
index 172ca26e..98008646 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
@@ -75,6 +75,7 @@ public class DeltaPartitionExtractor {
private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
+ private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
// For timestamp partition fields, actual partition column names in delta
format will be of type
// generated & and with a name like
`delta_partition_col_{transform_type}_{source_field_name}`.
private static final String DELTA_PARTITION_COL_NAME_FORMAT =
"xtable_partition_col_%s_%s";
@@ -242,7 +243,7 @@ public class DeltaPartitionExtractor {
currPartitionColumnName =
internalPartitionField.getSourceField().getName();
field = null;
} else {
- // Since partition field of timestamp type, create new field in schema.
+ // Since partition field of timestamp or bucket type, create new field
in schema.
field = getGeneratedField(internalPartitionField);
currPartitionColumnName = field.name();
}
@@ -270,6 +271,10 @@ public class DeltaPartitionExtractor {
"");
partitionValuesSerialized.put(
partitionField.getSourceField().getName(),
partitionValueSerialized);
+ } else if (transformType == PartitionTransformType.BUCKET) {
+ partitionValueSerialized =
partitionValue.getRange().getMaxValue().toString();
+ partitionValuesSerialized.put(
+ getGeneratedColumnName(partitionField), partitionValueSerialized);
} else {
// use appropriate date formatter for value serialization.
partitionValueSerialized =
@@ -352,7 +357,6 @@ public class DeltaPartitionExtractor {
String generatedExpression;
DataType dataType;
String currPartitionColumnName =
getGeneratedColumnName(internalPartitionField);
- Map<String, String> generatedExpressionMetadata = new HashMap<>();
switch (internalPartitionField.getTransformType()) {
case YEAR:
generatedExpression =
@@ -373,10 +377,23 @@ public class DeltaPartitionExtractor {
String.format(CAST_FUNCTION,
internalPartitionField.getSourceField().getPath());
dataType = DataTypes.DateType;
break;
+ case BUCKET:
+ generatedExpression =
+ String.format(
+ BUCKET_FUNCTION,
+ internalPartitionField.getSourceField().getPath(),
+ Integer.MAX_VALUE,
+ (int)
+ internalPartitionField
+ .getTransformOptions()
+ .get(InternalPartitionField.NUM_BUCKETS));
+ dataType = DataTypes.IntegerType;
+ break;
default:
throw new PartitionSpecException("Invalid transform type");
}
- generatedExpressionMetadata.put(DELTA_GENERATION_EXPRESSION,
generatedExpression);
+ Map<String, String> generatedExpressionMetadata =
+ Collections.singletonMap(DELTA_GENERATION_EXPRESSION,
generatedExpression);
Metadata partitionFieldMetadata =
new
Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata));
return new StructField(currPartitionColumnName, dataType, true,
partitionFieldMetadata);
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
index 7c48e88d..c6ac35fb 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
@@ -35,10 +35,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.xtable.exception.PartitionSpecException;
import org.apache.xtable.exception.UpdateException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.DataLayoutStrategy;
/** A class used to initialize new Hudi tables and load the metadata of
existing tables. */
@@ -124,6 +126,12 @@ public class HudiTableManager {
@VisibleForTesting
static String getKeyGeneratorClass(
List<InternalPartitionField> partitionFields, List<InternalField>
recordKeyFields) {
+ if (partitionFields.stream()
+ .anyMatch(
+ internalPartitionField ->
+ internalPartitionField.getTransformType() ==
PartitionTransformType.BUCKET)) {
+ throw new PartitionSpecException("Bucket partition is not yet supported
by Hudi targets");
+ }
boolean multipleRecordKeyFields = recordKeyFields.size() > 1;
boolean multiplePartitionFields = partitionFields.size() > 1;
String keyGeneratorClass;
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
index b51f8163..8e202f32 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
@@ -18,9 +18,14 @@
package org.apache.xtable.iceberg;
+import static org.apache.xtable.iceberg.IcebergPartitionValueConverter.BUCKET;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -32,6 +37,7 @@ import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.types.Types;
import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.exception.PartitionSpecException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
@@ -41,6 +47,7 @@ import org.apache.xtable.schema.SchemaFieldFinder;
/** Partition spec builder and extractor for Iceberg. */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IcebergPartitionSpecExtractor {
+ private static final Pattern NUM_BUCKETS_MATCHER =
Pattern.compile("bucket\\[(\\d+)\\]");
private static final IcebergPartitionSpecExtractor INSTANCE = new
IcebergPartitionSpecExtractor();
public static IcebergPartitionSpecExtractor getInstance() {
@@ -70,6 +77,12 @@ public class IcebergPartitionSpecExtractor {
case VALUE:
partitionSpecBuilder.identity(fieldPath);
break;
+ case BUCKET:
+ partitionSpecBuilder.bucket(
+ fieldPath,
+ (int)
+
partitioningField.getTransformOptions().get(InternalPartitionField.NUM_BUCKETS));
+ break;
default:
throw new IllegalArgumentException(
"Unsupported type: " + partitioningField.getTransformType());
@@ -99,13 +112,27 @@ public class IcebergPartitionSpecExtractor {
throw new NotSupportedException(transformName);
}
- if (transformName.startsWith("bucket")) {
- throw new NotSupportedException(transformName);
+ if (transformName.startsWith(BUCKET)) {
+ return PartitionTransformType.BUCKET;
}
throw new NotSupportedException(transform.toString());
}
+ private Map<String, Object> getPartitionTransformOptions(Transform<?, ?>
transform) {
+ if (transform.toString().startsWith(BUCKET)) {
+ Matcher matcher = NUM_BUCKETS_MATCHER.matcher(transform.toString());
+ if (matcher.matches()) {
+ return Collections.singletonMap(
+ InternalPartitionField.NUM_BUCKETS,
Integer.parseInt(matcher.group(1)));
+ } else {
+ throw new PartitionSpecException(
+ "Cannot parse number of buckets from partition transform: " +
transform);
+ }
+ }
+ return Collections.emptyMap();
+ }
+
/**
* Generates internal representation of the Iceberg partition spec.
*
@@ -121,6 +148,10 @@ public class IcebergPartitionSpecExtractor {
List<InternalPartitionField> irPartitionFields = new
ArrayList<>(iceSpec.fields().size());
for (PartitionField iceField : iceSpec.fields()) {
+ // skip void transform
+ if (iceField.transform().isVoid()) {
+ continue;
+ }
// fetch the ice field from the schema to properly handle hidden
partition fields
int sourceColumnId = iceField.sourceId();
Types.NestedField iceSchemaField = iceSchema.findField(sourceColumnId);
@@ -131,6 +162,7 @@ public class IcebergPartitionSpecExtractor {
InternalPartitionField.builder()
.sourceField(irField)
.transformType(fromIcebergTransform(iceField.transform()))
+
.transformOptions(getPartitionTransformOptions(iceField.transform()))
.build();
irPartitionFields.add(irPartitionField);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
index a6abd2a9..83979a66 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
@@ -66,6 +66,7 @@ public class IcebergPartitionValueConverter {
private static final String DAY = "day";
private static final String HOUR = "hour";
private static final String IDENTITY = "identity";
+ static final String BUCKET = "bucket";
public static IcebergPartitionValueConverter getInstance() {
return INSTANCE;
@@ -124,8 +125,16 @@ public class IcebergPartitionValueConverter {
transformType = PartitionTransformType.VALUE;
break;
default:
- throw new NotSupportedException(
- "Partition transform not supported: " +
partitionField.transform().toString());
+ if (partitionField.transform().toString().startsWith(BUCKET)) {
+ value = structLike.get(fieldPosition, Integer.class);
+ transformType = PartitionTransformType.BUCKET;
+ } else if (partitionField.transform().isVoid()) {
+ // skip void type
+ continue;
+ } else {
+ throw new NotSupportedException(
+ "Partition transform not supported: " +
partitionField.transform().toString());
+ }
}
Types.NestedField partitionSourceField =
partitionSpec.schema().findField(partitionField.sourceId());
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 3325ca67..dc90d4d5 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -100,6 +100,7 @@ import
org.apache.xtable.delta.DeltaConversionSourceProvider;
import org.apache.xtable.hudi.HudiConversionSourceProvider;
import org.apache.xtable.hudi.HudiTestUtil;
import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
+import org.apache.xtable.iceberg.TestIcebergDataHelper;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
@@ -772,6 +773,34 @@ public class ITConversionController {
}
}
+ @Test
+ void otherIcebergPartitionTypes() {
+ String tableName = getTableName();
+ ConversionController conversionController = new
ConversionController(jsc.hadoopConfiguration());
+ List<String> targetTableFormats = Collections.singletonList(DELTA);
+
+ ConversionSourceProvider<?> conversionSourceProvider =
getConversionSourceProvider(ICEBERG);
+ try (TestIcebergTable table =
+ new TestIcebergTable(
+ tableName,
+ tempDir,
+ jsc.hadoopConfiguration(),
+ "id",
+ Arrays.asList("level", "string_field"),
+ TestIcebergDataHelper.SchemaType.COMMON)) {
+ table.insertRows(100);
+
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ ICEBERG, SyncMode.FULL, tableName, table, targetTableFormats,
null, null);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(ICEBERG, table, targetTableFormats, 100);
+ // Query with filter to assert partition does not impact ability to query
+ checkDatasetEquivalenceWithFilter(
+ ICEBERG, table, targetTableFormats, "level == 'INFO' AND
string_field > 'abc'");
+ }
+ }
+
private Map<String, String> getTimeTravelOption(String tableFormat, Instant
time) {
Map<String, String> options = new HashMap<>();
switch (tableFormat) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index 3b944340..a912241d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -352,7 +352,7 @@ public class TestIcebergTable implements
GenericTable<Record, String> {
Path baseDataPath = Paths.get(icebergTable.location(), "data");
String filePath;
if (icebergDataHelper.getPartitionSpec().isPartitioned()) {
- String partitionPath = getPartitionPath(partitionKey.get(0,
String.class));
+ String partitionPath = ((PartitionKey) partitionKey).toPath();
filePath =
baseDataPath.resolve(partitionPath).resolve(UUID.randomUUID() +
".parquet").toString();
} else {
@@ -434,14 +434,4 @@ public class TestIcebergTable implements
GenericTable<Record, String> {
.map(entry -> writeAndGetDataFile(entry.getValue(), entry.getKey()))
.collect(Collectors.toList());
}
-
- private String getPartitionPath(Object partitionValue) {
- Preconditions.checkArgument(
- icebergDataHelper.getPartitionFieldNames().size() == 1,
- "Only single partition field is supported for grouping records by
partition");
- Preconditions.checkArgument(
- icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
- "Only level partition field is supported for grouping records by
partition");
- return "level=" + partitionValue;
- }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
index ac5daa99..3575dc11 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
@@ -18,6 +18,7 @@
package org.apache.xtable.delta;
+import static
org.apache.xtable.delta.DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION;
import static org.junit.jupiter.api.Assertions.*;
import java.util.Arrays;
@@ -494,6 +495,42 @@ public class TestDeltaPartitionExtractor {
assertEquals(expectedPartitionValues, partitionValues);
}
+ @Test
+ void convertBucketPartition() {
+ InternalPartitionField internalPartitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("partition_column1")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.BUCKET)
+
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS,
5))
+ .build();
+ Map<String, StructField> actual =
+ deltaPartitionExtractor.convertToDeltaPartitionFormat(
+ Collections.singletonList(internalPartitionField));
+ Metadata expectedPartitionFieldMetadata =
+ new Metadata(
+ ScalaUtils.convertJavaMapToScala(
+ Collections.singletonMap(
+ DELTA_GENERATION_EXPRESSION,
+ "MOD((HASH(partition_column1) & 2147483647), 5)")));
+ Map<String, StructField> expected =
+ Collections.singletonMap(
+ "xtable_partition_col_BUCKET_partition_column1",
+ new StructField(
+ "xtable_partition_col_BUCKET_partition_column1",
+ DataTypes.IntegerType,
+ true,
+ expectedPartitionFieldMetadata));
+ assertEquals(expected, actual);
+ }
+
private scala.collection.mutable.Map<String, String>
convertJavaMapToScalaMap(
Map<String, String> javaMap) {
return JavaConverters.mapAsScalaMapConverter(javaMap).asScala();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index d90ba169..a8518c1f 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -126,7 +126,7 @@ public class TestIcebergDataHelper {
String recordKeyField;
List<String> partitionFieldNames;
- public static enum SchemaType {
+ public enum SchemaType {
BASIC,
COMMON,
COMMON_WITH_ADDITIONAL_COLUMNS,
@@ -202,6 +202,13 @@ public class TestIcebergDataHelper {
if (partitionFieldNames.isEmpty()) {
return PartitionSpec.unpartitioned();
}
+ if (partitionFieldNames.equals(Arrays.asList("level", "string_field"))) {
+ return PartitionSpec.builderFor(tableSchema)
+ .alwaysNull("bytes_field")
+ .identity("level")
+ .bucket("string_field", 10)
+ .build();
+ }
if (partitionFieldNames.size() > 1) {
throw new IllegalArgumentException(
"Please modify the code to support multiple partition columns");
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
index 31fbb6f5..5935f4cb 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
@@ -18,6 +18,8 @@
package org.apache.xtable.iceberg;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -54,7 +56,7 @@ public class TestIcebergPartitionSpecExtractor {
PartitionSpec actual =
IcebergPartitionSpecExtractor.getInstance().toIceberg(null,
icebergSchema);
PartitionSpec expected = PartitionSpec.unpartitioned();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -84,7 +86,7 @@ public class TestIcebergPartitionSpecExtractor {
.hour("timestamp_hour")
.identity("string_field")
.build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -103,7 +105,7 @@ public class TestIcebergPartitionSpecExtractor {
PartitionSpec actual =
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
TEST_SCHEMA);
PartitionSpec expected =
PartitionSpec.builderFor(TEST_SCHEMA).year("timestamp_year").build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -122,7 +124,7 @@ public class TestIcebergPartitionSpecExtractor {
PartitionSpec actual =
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
TEST_SCHEMA);
PartitionSpec expected =
PartitionSpec.builderFor(TEST_SCHEMA).month("timestamp_month").build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -141,7 +143,7 @@ public class TestIcebergPartitionSpecExtractor {
PartitionSpec actual =
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
TEST_SCHEMA);
PartitionSpec expected =
PartitionSpec.builderFor(TEST_SCHEMA).day("timestamp_day").build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -160,7 +162,7 @@ public class TestIcebergPartitionSpecExtractor {
PartitionSpec actual =
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
TEST_SCHEMA);
PartitionSpec expected =
PartitionSpec.builderFor(TEST_SCHEMA).hour("timestamp_hour").build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
}
@Test
@@ -187,7 +189,27 @@ public class TestIcebergPartitionSpecExtractor {
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
icebergSchema);
PartitionSpec expected =
PartitionSpec.builderFor(icebergSchema).identity("data.nested").build();
- Assertions.assertEquals(expected, actual);
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testBucketPartition() {
+ List<InternalPartitionField> partitionFieldList =
+ Collections.singletonList(
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("string_field")
+
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+ .build())
+ .transformType(PartitionTransformType.BUCKET)
+
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS,
3))
+ .build());
+ PartitionSpec actual =
+
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList,
TEST_SCHEMA);
+ PartitionSpec expected =
+ PartitionSpec.builderFor(TEST_SCHEMA).bucket("string_field",
3).build();
+ assertEquals(expected, actual);
}
@Test
@@ -195,7 +217,7 @@ public class TestIcebergPartitionSpecExtractor {
IcebergPartitionSpecExtractor extractor =
IcebergPartitionSpecExtractor.getInstance();
List<InternalPartitionField> fields =
extractor.fromIceberg(PartitionSpec.unpartitioned(), null, null);
- Assertions.assertEquals(0, fields.size());
+ assertEquals(0, fields.size());
}
@Test
@@ -227,13 +249,52 @@ public class TestIcebergPartitionSpecExtractor {
List<InternalPartitionField> irPartitionSpec =
extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
- Assertions.assertEquals(1, irPartitionSpec.size());
+ assertEquals(1, irPartitionSpec.size());
InternalField sourceField = irPartitionSpec.get(0).getSourceField();
- Assertions.assertEquals("key_string", sourceField.getName());
- Assertions.assertEquals(1, sourceField.getFieldId());
- Assertions.assertEquals(InternalType.STRING,
sourceField.getSchema().getDataType());
- Assertions.assertEquals(
- PartitionTransformType.VALUE,
irPartitionSpec.get(0).getTransformType());
+ assertEquals("key_string", sourceField.getName());
+ assertEquals(1, sourceField.getFieldId());
+ assertEquals(InternalType.STRING, sourceField.getSchema().getDataType());
+ assertEquals(PartitionTransformType.VALUE,
irPartitionSpec.get(0).getTransformType());
+ }
+
+ @Test
+ void testFromIcebergBucket() {
+ IcebergPartitionSpecExtractor extractor =
IcebergPartitionSpecExtractor.getInstance();
+
+ Schema iceSchema =
+ new Schema(
+ Types.NestedField.required(0, "data_int", Types.IntegerType.get()),
+ Types.NestedField.required(1, "key_string",
Types.StringType.get()));
+ PartitionSpec icePartitionSpec =
+ PartitionSpec.builderFor(iceSchema).bucket("data_int", 2).build();
+
+ InternalSchema irSchema =
+ InternalSchema.builder()
+ .name("test_schema")
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("data_int")
+
.schema(InternalSchema.builder().dataType(InternalType.INT).build())
+ .build(),
+ InternalField.builder()
+ .name("key_string")
+ .fieldId(1)
+
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+ .build()))
+ .build();
+
+ List<InternalPartitionField> irPartitionSpec =
+ extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
+
+ InternalPartitionField expected =
+ InternalPartitionField.builder()
+ .sourceField(irSchema.getFields().get(0))
+ .transformType(PartitionTransformType.BUCKET)
+
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS,
2))
+ .build();
+
+ assertEquals(Collections.singletonList(expected), irPartitionSpec);
}
@Test
@@ -266,37 +327,34 @@ public class TestIcebergPartitionSpecExtractor {
List<InternalPartitionField> irPartitionSpec =
extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
- Assertions.assertEquals(2, irPartitionSpec.size());
+ assertEquals(2, irPartitionSpec.size());
InternalField sourceField = irPartitionSpec.get(0).getSourceField();
- Assertions.assertEquals("key_string", sourceField.getName());
- Assertions.assertEquals(11, sourceField.getFieldId());
- Assertions.assertEquals(InternalType.STRING,
sourceField.getSchema().getDataType());
- Assertions.assertEquals(
- PartitionTransformType.VALUE,
irPartitionSpec.get(0).getTransformType());
+ assertEquals("key_string", sourceField.getName());
+ assertEquals(11, sourceField.getFieldId());
+ assertEquals(InternalType.STRING, sourceField.getSchema().getDataType());
+ assertEquals(PartitionTransformType.VALUE,
irPartitionSpec.get(0).getTransformType());
sourceField = irPartitionSpec.get(1).getSourceField();
- Assertions.assertEquals("key_year", sourceField.getName());
- Assertions.assertEquals(10, sourceField.getFieldId());
- Assertions.assertEquals(InternalType.DATE,
sourceField.getSchema().getDataType());
- Assertions.assertEquals(PartitionTransformType.YEAR,
irPartitionSpec.get(1).getTransformType());
+ assertEquals("key_year", sourceField.getName());
+ assertEquals(10, sourceField.getFieldId());
+ assertEquals(InternalType.DATE, sourceField.getSchema().getDataType());
+ assertEquals(PartitionTransformType.YEAR,
irPartitionSpec.get(1).getTransformType());
}
@Test
public void fromIcebergTransformType() {
IcebergPartitionSpecExtractor extractor =
IcebergPartitionSpecExtractor.getInstance();
- Assertions.assertEquals(
- PartitionTransformType.YEAR,
extractor.fromIcebergTransform(Transforms.year()));
- Assertions.assertEquals(
- PartitionTransformType.MONTH,
extractor.fromIcebergTransform(Transforms.month()));
- Assertions.assertEquals(
- PartitionTransformType.DAY,
extractor.fromIcebergTransform(Transforms.day()));
- Assertions.assertEquals(
- PartitionTransformType.HOUR,
extractor.fromIcebergTransform(Transforms.hour()));
- Assertions.assertEquals(
+ assertEquals(PartitionTransformType.YEAR,
extractor.fromIcebergTransform(Transforms.year()));
+ assertEquals(PartitionTransformType.MONTH,
extractor.fromIcebergTransform(Transforms.month()));
+ assertEquals(PartitionTransformType.DAY,
extractor.fromIcebergTransform(Transforms.day()));
+ assertEquals(PartitionTransformType.HOUR,
extractor.fromIcebergTransform(Transforms.hour()));
+ assertEquals(
PartitionTransformType.VALUE,
extractor.fromIcebergTransform(Transforms.identity()));
+ assertEquals(
+ PartitionTransformType.BUCKET,
extractor.fromIcebergTransform(Transforms.bucket(2)));
Assertions.assertThrows(
- NotSupportedException.class, () ->
extractor.fromIcebergTransform(Transforms.bucket(10)));
+ NotSupportedException.class, () ->
extractor.fromIcebergTransform(Transforms.truncate(10)));
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
index 7ff331ec..bdbb444c 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
@@ -109,6 +109,31 @@ public class TestIcebergPartitionValueConverter {
assertEquals(expectedPartitionValues, partitionValues);
}
+ @Test
+ void testToXTableBucketPartitioned() {
+ Schema schemaWithPartition =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()),
+ Types.NestedField.optional(3, "birthDate",
Types.TimestampType.withZone()),
+ Types.NestedField.optional(4, "name_bucket",
Types.IntegerType.get()));
+ StructLike structLike = Row.of(schemaWithPartition, 1, "abc",
1614556800000L, 5);
+ List<PartitionValue> expectedPartitionValues =
+ Collections.singletonList(
+ PartitionValue.builder()
+ .partitionField(getPartitionField("name",
PartitionTransformType.BUCKET))
+ .range(Range.scalar(5))
+ .build());
+ PartitionSpec partitionSpec =
PartitionSpec.builderFor(SCHEMA).bucket("name", 8).build();
+ List<PartitionValue> partitionValues =
+ partitionValueConverter.toXTable(
+ buildInternalTable(true, "name", PartitionTransformType.BUCKET),
+ structLike,
+ partitionSpec);
+ assertEquals(1, partitionValues.size());
+ assertEquals(expectedPartitionValues, partitionValues);
+ }
+
private InternalTable buildInternalTable(boolean isPartitioned) {
return buildInternalTable(isPartitioned, null, null);
}