This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new dafe24a8 Add support for bucket partition transform, ignore void 
partition in iceberg source
dafe24a8 is described below

commit dafe24a86c38bf32ce93464d14ce66c0029ca724
Author: Timothy Brown <[email protected]>
AuthorDate: Mon Mar 31 20:11:38 2025 -0500

    Add support for bucket partition transform, ignore void partition in 
iceberg source
---
 .../model/schema/InternalPartitionField.java       |   4 +
 .../model/schema/PartitionTransformType.java       |   3 +-
 .../xtable/delta/DeltaPartitionExtractor.java      |  23 +++-
 .../org/apache/xtable/hudi/HudiTableManager.java   |   8 ++
 .../iceberg/IcebergPartitionSpecExtractor.java     |  36 +++++-
 .../iceberg/IcebergPartitionValueConverter.java    |  13 ++-
 .../org/apache/xtable/ITConversionController.java  |  29 +++++
 .../java/org/apache/xtable/TestIcebergTable.java   |  12 +-
 .../xtable/delta/TestDeltaPartitionExtractor.java  |  37 ++++++
 .../xtable/iceberg/TestIcebergDataHelper.java      |   9 +-
 .../iceberg/TestIcebergPartitionSpecExtractor.java | 126 +++++++++++++++------
 .../TestIcebergPartitionValueConverter.java        |  25 ++++
 12 files changed, 271 insertions(+), 54 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
index 7c1757a5..99d3daa7 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalPartitionField.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.schema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import lombok.Builder;
 import lombok.Value;
@@ -32,6 +33,7 @@ import lombok.Value;
 @Value
 @Builder
 public class InternalPartitionField {
+  public static final String NUM_BUCKETS = "NUM_BUCKETS";
   // Source field the partition is based on
   InternalField sourceField;
   /*
@@ -47,4 +49,6 @@ public class InternalPartitionField {
   @Builder.Default List<String> partitionFieldNames = Collections.emptyList();
   // An enum describing how the source data was transformed into the partition 
value
   PartitionTransformType transformType;
+  // Transform options such as number of buckets in the BUCKET transform type
+  @Builder.Default Map<String, Object> transformOptions = 
Collections.emptyMap();
 }
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
index d6e082f1..8af9cea8 100644
--- 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
+++ 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/PartitionTransformType.java
@@ -30,7 +30,8 @@ public enum PartitionTransformType {
   MONTH,
   DAY,
   HOUR,
-  VALUE;
+  VALUE,
+  BUCKET;
 
   public boolean isTimeBased() {
     return this == YEAR || this == MONTH || this == DAY || this == HOUR;
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
index 172ca26e..98008646 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaPartitionExtractor.java
@@ -75,6 +75,7 @@ public class DeltaPartitionExtractor {
   private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
   private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
   private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
+  private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
   // For timestamp partition fields, actual partition column names in delta 
format will be of type
   // generated & and with a name like 
`delta_partition_col_{transform_type}_{source_field_name}`.
   private static final String DELTA_PARTITION_COL_NAME_FORMAT = 
"xtable_partition_col_%s_%s";
@@ -242,7 +243,7 @@ public class DeltaPartitionExtractor {
         currPartitionColumnName = 
internalPartitionField.getSourceField().getName();
         field = null;
       } else {
-        // Since partition field of timestamp type, create new field in schema.
+        // Since partition field of timestamp or bucket type, create new field 
in schema.
         field = getGeneratedField(internalPartitionField);
         currPartitionColumnName = field.name();
       }
@@ -270,6 +271,10 @@ public class DeltaPartitionExtractor {
                 "");
         partitionValuesSerialized.put(
             partitionField.getSourceField().getName(), 
partitionValueSerialized);
+      } else if (transformType == PartitionTransformType.BUCKET) {
+        partitionValueSerialized = 
partitionValue.getRange().getMaxValue().toString();
+        partitionValuesSerialized.put(
+            getGeneratedColumnName(partitionField), partitionValueSerialized);
       } else {
         // use appropriate date formatter for value serialization.
         partitionValueSerialized =
@@ -352,7 +357,6 @@ public class DeltaPartitionExtractor {
     String generatedExpression;
     DataType dataType;
     String currPartitionColumnName = 
getGeneratedColumnName(internalPartitionField);
-    Map<String, String> generatedExpressionMetadata = new HashMap<>();
     switch (internalPartitionField.getTransformType()) {
       case YEAR:
         generatedExpression =
@@ -373,10 +377,23 @@ public class DeltaPartitionExtractor {
             String.format(CAST_FUNCTION, 
internalPartitionField.getSourceField().getPath());
         dataType = DataTypes.DateType;
         break;
+      case BUCKET:
+        generatedExpression =
+            String.format(
+                BUCKET_FUNCTION,
+                internalPartitionField.getSourceField().getPath(),
+                Integer.MAX_VALUE,
+                (int)
+                    internalPartitionField
+                        .getTransformOptions()
+                        .get(InternalPartitionField.NUM_BUCKETS));
+        dataType = DataTypes.IntegerType;
+        break;
       default:
         throw new PartitionSpecException("Invalid transform type");
     }
-    generatedExpressionMetadata.put(DELTA_GENERATION_EXPRESSION, 
generatedExpression);
+    Map<String, String> generatedExpressionMetadata =
+        Collections.singletonMap(DELTA_GENERATION_EXPRESSION, 
generatedExpression);
     Metadata partitionFieldMetadata =
         new 
Metadata(ScalaUtils.convertJavaMapToScala(generatedExpressionMetadata));
     return new StructField(currPartitionColumnName, dataType, true, 
partitionFieldMetadata);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java 
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
index 7c48e88d..c6ac35fb 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
@@ -35,10 +35,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.TableNotFoundException;
 
+import org.apache.xtable.exception.PartitionSpecException;
 import org.apache.xtable.exception.UpdateException;
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.PartitionTransformType;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 
 /** A class used to initialize new Hudi tables and load the metadata of 
existing tables. */
@@ -124,6 +126,12 @@ public class HudiTableManager {
   @VisibleForTesting
   static String getKeyGeneratorClass(
       List<InternalPartitionField> partitionFields, List<InternalField> 
recordKeyFields) {
+    if (partitionFields.stream()
+        .anyMatch(
+            internalPartitionField ->
+                internalPartitionField.getTransformType() == 
PartitionTransformType.BUCKET)) {
+      throw new PartitionSpecException("Bucket partition is not yet supported 
by Hudi targets");
+    }
     boolean multipleRecordKeyFields = recordKeyFields.size() > 1;
     boolean multiplePartitionFields = partitionFields.size() > 1;
     String keyGeneratorClass;
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
index b51f8163..8e202f32 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionSpecExtractor.java
@@ -18,9 +18,14 @@
  
 package org.apache.xtable.iceberg;
 
+import static org.apache.xtable.iceberg.IcebergPartitionValueConverter.BUCKET;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -32,6 +37,7 @@ import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.types.Types;
 
 import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.exception.PartitionSpecException;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
@@ -41,6 +47,7 @@ import org.apache.xtable.schema.SchemaFieldFinder;
 /** Partition spec builder and extractor for Iceberg. */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class IcebergPartitionSpecExtractor {
+  private static final Pattern NUM_BUCKETS_MATCHER = 
Pattern.compile("bucket\\[(\\d+)\\]");
   private static final IcebergPartitionSpecExtractor INSTANCE = new 
IcebergPartitionSpecExtractor();
 
   public static IcebergPartitionSpecExtractor getInstance() {
@@ -70,6 +77,12 @@ public class IcebergPartitionSpecExtractor {
         case VALUE:
           partitionSpecBuilder.identity(fieldPath);
           break;
+        case BUCKET:
+          partitionSpecBuilder.bucket(
+              fieldPath,
+              (int)
+                  
partitioningField.getTransformOptions().get(InternalPartitionField.NUM_BUCKETS));
+          break;
         default:
           throw new IllegalArgumentException(
               "Unsupported type: " + partitioningField.getTransformType());
@@ -99,13 +112,27 @@ public class IcebergPartitionSpecExtractor {
       throw new NotSupportedException(transformName);
     }
 
-    if (transformName.startsWith("bucket")) {
-      throw new NotSupportedException(transformName);
+    if (transformName.startsWith(BUCKET)) {
+      return PartitionTransformType.BUCKET;
     }
 
     throw new NotSupportedException(transform.toString());
   }
 
+  private Map<String, Object> getPartitionTransformOptions(Transform<?, ?> 
transform) {
+    if (transform.toString().startsWith(BUCKET)) {
+      Matcher matcher = NUM_BUCKETS_MATCHER.matcher(transform.toString());
+      if (matcher.matches()) {
+        return Collections.singletonMap(
+            InternalPartitionField.NUM_BUCKETS, 
Integer.parseInt(matcher.group(1)));
+      } else {
+        throw new PartitionSpecException(
+            "Cannot parse number of buckets from partition transform: " + 
transform);
+      }
+    }
+    return Collections.emptyMap();
+  }
+
   /**
    * Generates internal representation of the Iceberg partition spec.
    *
@@ -121,6 +148,10 @@ public class IcebergPartitionSpecExtractor {
 
     List<InternalPartitionField> irPartitionFields = new 
ArrayList<>(iceSpec.fields().size());
     for (PartitionField iceField : iceSpec.fields()) {
+      // skip void transform
+      if (iceField.transform().isVoid()) {
+        continue;
+      }
       // fetch the ice field from the schema to properly handle hidden 
partition fields
       int sourceColumnId = iceField.sourceId();
       Types.NestedField iceSchemaField = iceSchema.findField(sourceColumnId);
@@ -131,6 +162,7 @@ public class IcebergPartitionSpecExtractor {
           InternalPartitionField.builder()
               .sourceField(irField)
               .transformType(fromIcebergTransform(iceField.transform()))
+              
.transformOptions(getPartitionTransformOptions(iceField.transform()))
               .build();
       irPartitionFields.add(irPartitionField);
     }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
index a6abd2a9..83979a66 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergPartitionValueConverter.java
@@ -66,6 +66,7 @@ public class IcebergPartitionValueConverter {
   private static final String DAY = "day";
   private static final String HOUR = "hour";
   private static final String IDENTITY = "identity";
+  static final String BUCKET = "bucket";
 
   public static IcebergPartitionValueConverter getInstance() {
     return INSTANCE;
@@ -124,8 +125,16 @@ public class IcebergPartitionValueConverter {
           transformType = PartitionTransformType.VALUE;
           break;
         default:
-          throw new NotSupportedException(
-              "Partition transform not supported: " + 
partitionField.transform().toString());
+          if (partitionField.transform().toString().startsWith(BUCKET)) {
+            value = structLike.get(fieldPosition, Integer.class);
+            transformType = PartitionTransformType.BUCKET;
+          } else if (partitionField.transform().isVoid()) {
+            // skip void type
+            continue;
+          } else {
+            throw new NotSupportedException(
+                "Partition transform not supported: " + 
partitionField.transform().toString());
+          }
       }
       Types.NestedField partitionSourceField =
           partitionSpec.schema().findField(partitionField.sourceId());
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 3325ca67..dc90d4d5 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -100,6 +100,7 @@ import 
org.apache.xtable.delta.DeltaConversionSourceProvider;
 import org.apache.xtable.hudi.HudiConversionSourceProvider;
 import org.apache.xtable.hudi.HudiTestUtil;
 import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
+import org.apache.xtable.iceberg.TestIcebergDataHelper;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
 
@@ -772,6 +773,34 @@ public class ITConversionController {
     }
   }
 
+  @Test
+  void otherIcebergPartitionTypes() {
+    String tableName = getTableName();
+    ConversionController conversionController = new 
ConversionController(jsc.hadoopConfiguration());
+    List<String> targetTableFormats = Collections.singletonList(DELTA);
+
+    ConversionSourceProvider<?> conversionSourceProvider = 
getConversionSourceProvider(ICEBERG);
+    try (TestIcebergTable table =
+        new TestIcebergTable(
+            tableName,
+            tempDir,
+            jsc.hadoopConfiguration(),
+            "id",
+            Arrays.asList("level", "string_field"),
+            TestIcebergDataHelper.SchemaType.COMMON)) {
+      table.insertRows(100);
+
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              ICEBERG, SyncMode.FULL, tableName, table, targetTableFormats, 
null, null);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(ICEBERG, table, targetTableFormats, 100);
+      // Query with filter to assert partition does not impact ability to query
+      checkDatasetEquivalenceWithFilter(
+          ICEBERG, table, targetTableFormats, "level == 'INFO' AND 
string_field > 'abc'");
+    }
+  }
+
   private Map<String, String> getTimeTravelOption(String tableFormat, Instant 
time) {
     Map<String, String> options = new HashMap<>();
     switch (tableFormat) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
index 3b944340..a912241d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java
@@ -352,7 +352,7 @@ public class TestIcebergTable implements 
GenericTable<Record, String> {
     Path baseDataPath = Paths.get(icebergTable.location(), "data");
     String filePath;
     if (icebergDataHelper.getPartitionSpec().isPartitioned()) {
-      String partitionPath = getPartitionPath(partitionKey.get(0, 
String.class));
+      String partitionPath = ((PartitionKey) partitionKey).toPath();
       filePath =
           baseDataPath.resolve(partitionPath).resolve(UUID.randomUUID() + 
".parquet").toString();
     } else {
@@ -434,14 +434,4 @@ public class TestIcebergTable implements 
GenericTable<Record, String> {
         .map(entry -> writeAndGetDataFile(entry.getValue(), entry.getKey()))
         .collect(Collectors.toList());
   }
-
-  private String getPartitionPath(Object partitionValue) {
-    Preconditions.checkArgument(
-        icebergDataHelper.getPartitionFieldNames().size() == 1,
-        "Only single partition field is supported for grouping records by 
partition");
-    Preconditions.checkArgument(
-        icebergDataHelper.getPartitionFieldNames().get(0).equals("level"),
-        "Only level partition field is supported for grouping records by 
partition");
-    return "level=" + partitionValue;
-  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
index ac5daa99..3575dc11 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaPartitionExtractor.java
@@ -18,6 +18,7 @@
  
 package org.apache.xtable.delta;
 
+import static 
org.apache.xtable.delta.DeltaPartitionExtractor.DELTA_GENERATION_EXPRESSION;
 import static org.junit.jupiter.api.Assertions.*;
 
 import java.util.Arrays;
@@ -494,6 +495,42 @@ public class TestDeltaPartitionExtractor {
     assertEquals(expectedPartitionValues, partitionValues);
   }
 
+  @Test
+  void convertBucketPartition() {
+    InternalPartitionField internalPartitionField =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column1")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.BUCKET)
+            
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 
5))
+            .build();
+    Map<String, StructField> actual =
+        deltaPartitionExtractor.convertToDeltaPartitionFormat(
+            Collections.singletonList(internalPartitionField));
+    Metadata expectedPartitionFieldMetadata =
+        new Metadata(
+            ScalaUtils.convertJavaMapToScala(
+                Collections.singletonMap(
+                    DELTA_GENERATION_EXPRESSION,
+                    "MOD((HASH(partition_column1) & 2147483647), 5)")));
+    Map<String, StructField> expected =
+        Collections.singletonMap(
+            "xtable_partition_col_BUCKET_partition_column1",
+            new StructField(
+                "xtable_partition_col_BUCKET_partition_column1",
+                DataTypes.IntegerType,
+                true,
+                expectedPartitionFieldMetadata));
+    assertEquals(expected, actual);
+  }
+
   private scala.collection.mutable.Map<String, String> 
convertJavaMapToScalaMap(
       Map<String, String> javaMap) {
     return JavaConverters.mapAsScalaMapConverter(javaMap).asScala();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index d90ba169..a8518c1f 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -126,7 +126,7 @@ public class TestIcebergDataHelper {
   String recordKeyField;
   List<String> partitionFieldNames;
 
-  public static enum SchemaType {
+  public enum SchemaType {
     BASIC,
     COMMON,
     COMMON_WITH_ADDITIONAL_COLUMNS,
@@ -202,6 +202,13 @@ public class TestIcebergDataHelper {
     if (partitionFieldNames.isEmpty()) {
       return PartitionSpec.unpartitioned();
     }
+    if (partitionFieldNames.equals(Arrays.asList("level", "string_field"))) {
+      return PartitionSpec.builderFor(tableSchema)
+          .alwaysNull("bytes_field")
+          .identity("level")
+          .bucket("string_field", 10)
+          .build();
+    }
     if (partitionFieldNames.size() > 1) {
       throw new IllegalArgumentException(
           "Please modify the code to support multiple partition columns");
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
index 31fbb6f5..5935f4cb 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionSpecExtractor.java
@@ -18,6 +18,8 @@
  
 package org.apache.xtable.iceberg;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -54,7 +56,7 @@ public class TestIcebergPartitionSpecExtractor {
     PartitionSpec actual =
         IcebergPartitionSpecExtractor.getInstance().toIceberg(null, 
icebergSchema);
     PartitionSpec expected = PartitionSpec.unpartitioned();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -84,7 +86,7 @@ public class TestIcebergPartitionSpecExtractor {
             .hour("timestamp_hour")
             .identity("string_field")
             .build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -103,7 +105,7 @@ public class TestIcebergPartitionSpecExtractor {
     PartitionSpec actual =
         
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
TEST_SCHEMA);
     PartitionSpec expected = 
PartitionSpec.builderFor(TEST_SCHEMA).year("timestamp_year").build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -122,7 +124,7 @@ public class TestIcebergPartitionSpecExtractor {
     PartitionSpec actual =
         
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
TEST_SCHEMA);
     PartitionSpec expected = 
PartitionSpec.builderFor(TEST_SCHEMA).month("timestamp_month").build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -141,7 +143,7 @@ public class TestIcebergPartitionSpecExtractor {
     PartitionSpec actual =
         
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
TEST_SCHEMA);
     PartitionSpec expected = 
PartitionSpec.builderFor(TEST_SCHEMA).day("timestamp_day").build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -160,7 +162,7 @@ public class TestIcebergPartitionSpecExtractor {
     PartitionSpec actual =
         
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
TEST_SCHEMA);
     PartitionSpec expected = 
PartitionSpec.builderFor(TEST_SCHEMA).hour("timestamp_hour").build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -187,7 +189,27 @@ public class TestIcebergPartitionSpecExtractor {
         
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
icebergSchema);
     PartitionSpec expected =
         
PartitionSpec.builderFor(icebergSchema).identity("data.nested").build();
-    Assertions.assertEquals(expected, actual);
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  void testBucketPartition() {
+    List<InternalPartitionField> partitionFieldList =
+        Collections.singletonList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    InternalField.builder()
+                        .name("string_field")
+                        
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+                        .build())
+                .transformType(PartitionTransformType.BUCKET)
+                
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 
3))
+                .build());
+    PartitionSpec actual =
+        
IcebergPartitionSpecExtractor.getInstance().toIceberg(partitionFieldList, 
TEST_SCHEMA);
+    PartitionSpec expected =
+        PartitionSpec.builderFor(TEST_SCHEMA).bucket("string_field", 
3).build();
+    assertEquals(expected, actual);
   }
 
   @Test
@@ -195,7 +217,7 @@ public class TestIcebergPartitionSpecExtractor {
     IcebergPartitionSpecExtractor extractor = 
IcebergPartitionSpecExtractor.getInstance();
     List<InternalPartitionField> fields =
         extractor.fromIceberg(PartitionSpec.unpartitioned(), null, null);
-    Assertions.assertEquals(0, fields.size());
+    assertEquals(0, fields.size());
   }
 
   @Test
@@ -227,13 +249,52 @@ public class TestIcebergPartitionSpecExtractor {
 
     List<InternalPartitionField> irPartitionSpec =
         extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
-    Assertions.assertEquals(1, irPartitionSpec.size());
+    assertEquals(1, irPartitionSpec.size());
     InternalField sourceField = irPartitionSpec.get(0).getSourceField();
-    Assertions.assertEquals("key_string", sourceField.getName());
-    Assertions.assertEquals(1, sourceField.getFieldId());
-    Assertions.assertEquals(InternalType.STRING, 
sourceField.getSchema().getDataType());
-    Assertions.assertEquals(
-        PartitionTransformType.VALUE, 
irPartitionSpec.get(0).getTransformType());
+    assertEquals("key_string", sourceField.getName());
+    assertEquals(1, sourceField.getFieldId());
+    assertEquals(InternalType.STRING, sourceField.getSchema().getDataType());
+    assertEquals(PartitionTransformType.VALUE, 
irPartitionSpec.get(0).getTransformType());
+  }
+
+  @Test
+  void testFromIcebergBucket() {
+    IcebergPartitionSpecExtractor extractor = 
IcebergPartitionSpecExtractor.getInstance();
+
+    Schema iceSchema =
+        new Schema(
+            Types.NestedField.required(0, "data_int", Types.IntegerType.get()),
+            Types.NestedField.required(1, "key_string", 
Types.StringType.get()));
+    PartitionSpec icePartitionSpec =
+        PartitionSpec.builderFor(iceSchema).bucket("data_int", 2).build();
+
+    InternalSchema irSchema =
+        InternalSchema.builder()
+            .name("test_schema")
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("data_int")
+                        
.schema(InternalSchema.builder().dataType(InternalType.INT).build())
+                        .build(),
+                    InternalField.builder()
+                        .name("key_string")
+                        .fieldId(1)
+                        
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+                        .build()))
+            .build();
+
+    List<InternalPartitionField> irPartitionSpec =
+        extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
+
+    InternalPartitionField expected =
+        InternalPartitionField.builder()
+            .sourceField(irSchema.getFields().get(0))
+            .transformType(PartitionTransformType.BUCKET)
+            
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 
2))
+            .build();
+
+    assertEquals(Collections.singletonList(expected), irPartitionSpec);
   }
 
   @Test
@@ -266,37 +327,34 @@ public class TestIcebergPartitionSpecExtractor {
 
     List<InternalPartitionField> irPartitionSpec =
         extractor.fromIceberg(icePartitionSpec, iceSchema, irSchema);
-    Assertions.assertEquals(2, irPartitionSpec.size());
+    assertEquals(2, irPartitionSpec.size());
 
     InternalField sourceField = irPartitionSpec.get(0).getSourceField();
-    Assertions.assertEquals("key_string", sourceField.getName());
-    Assertions.assertEquals(11, sourceField.getFieldId());
-    Assertions.assertEquals(InternalType.STRING, 
sourceField.getSchema().getDataType());
-    Assertions.assertEquals(
-        PartitionTransformType.VALUE, 
irPartitionSpec.get(0).getTransformType());
+    assertEquals("key_string", sourceField.getName());
+    assertEquals(11, sourceField.getFieldId());
+    assertEquals(InternalType.STRING, sourceField.getSchema().getDataType());
+    assertEquals(PartitionTransformType.VALUE, 
irPartitionSpec.get(0).getTransformType());
 
     sourceField = irPartitionSpec.get(1).getSourceField();
-    Assertions.assertEquals("key_year", sourceField.getName());
-    Assertions.assertEquals(10, sourceField.getFieldId());
-    Assertions.assertEquals(InternalType.DATE, 
sourceField.getSchema().getDataType());
-    Assertions.assertEquals(PartitionTransformType.YEAR, 
irPartitionSpec.get(1).getTransformType());
+    assertEquals("key_year", sourceField.getName());
+    assertEquals(10, sourceField.getFieldId());
+    assertEquals(InternalType.DATE, sourceField.getSchema().getDataType());
+    assertEquals(PartitionTransformType.YEAR, 
irPartitionSpec.get(1).getTransformType());
   }
 
   @Test
   public void fromIcebergTransformType() {
     IcebergPartitionSpecExtractor extractor = 
IcebergPartitionSpecExtractor.getInstance();
-    Assertions.assertEquals(
-        PartitionTransformType.YEAR, 
extractor.fromIcebergTransform(Transforms.year()));
-    Assertions.assertEquals(
-        PartitionTransformType.MONTH, 
extractor.fromIcebergTransform(Transforms.month()));
-    Assertions.assertEquals(
-        PartitionTransformType.DAY, 
extractor.fromIcebergTransform(Transforms.day()));
-    Assertions.assertEquals(
-        PartitionTransformType.HOUR, 
extractor.fromIcebergTransform(Transforms.hour()));
-    Assertions.assertEquals(
+    assertEquals(PartitionTransformType.YEAR, 
extractor.fromIcebergTransform(Transforms.year()));
+    assertEquals(PartitionTransformType.MONTH, 
extractor.fromIcebergTransform(Transforms.month()));
+    assertEquals(PartitionTransformType.DAY, 
extractor.fromIcebergTransform(Transforms.day()));
+    assertEquals(PartitionTransformType.HOUR, 
extractor.fromIcebergTransform(Transforms.hour()));
+    assertEquals(
         PartitionTransformType.VALUE, 
extractor.fromIcebergTransform(Transforms.identity()));
+    assertEquals(
+        PartitionTransformType.BUCKET, 
extractor.fromIcebergTransform(Transforms.bucket(2)));
 
     Assertions.assertThrows(
-        NotSupportedException.class, () -> 
extractor.fromIcebergTransform(Transforms.bucket(10)));
+        NotSupportedException.class, () -> 
extractor.fromIcebergTransform(Transforms.truncate(10)));
   }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
index 7ff331ec..bdbb444c 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergPartitionValueConverter.java
@@ -109,6 +109,31 @@ public class TestIcebergPartitionValueConverter {
     assertEquals(expectedPartitionValues, partitionValues);
   }
 
+  @Test
+  void testToXTableBucketPartitioned() {
+    Schema schemaWithPartition =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+            Types.NestedField.optional(2, "name", Types.StringType.get()),
+            Types.NestedField.optional(3, "birthDate", 
Types.TimestampType.withZone()),
+            Types.NestedField.optional(4, "name_bucket", 
Types.IntegerType.get()));
+    StructLike structLike = Row.of(schemaWithPartition, 1, "abc", 
1614556800000L, 5);
+    List<PartitionValue> expectedPartitionValues =
+        Collections.singletonList(
+            PartitionValue.builder()
+                .partitionField(getPartitionField("name", 
PartitionTransformType.BUCKET))
+                .range(Range.scalar(5))
+                .build());
+    PartitionSpec partitionSpec = 
PartitionSpec.builderFor(SCHEMA).bucket("name", 8).build();
+    List<PartitionValue> partitionValues =
+        partitionValueConverter.toXTable(
+            buildInternalTable(true, "name", PartitionTransformType.BUCKET),
+            structLike,
+            partitionSpec);
+    assertEquals(1, partitionValues.size());
+    assertEquals(expectedPartitionValues, partitionValues);
+  }
+
   private InternalTable buildInternalTable(boolean isPartitioned) {
     return buildInternalTable(isPartitioned, null, null);
   }

Reply via email to