the-other-tim-brown commented on code in PR #729:
URL: https://github.com/apache/incubator-xtable/pull/729#discussion_r2540283029


##########
xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import scala.collection.JavaConverters;
+
+import io.delta.kernel.Table;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.actions.AddFile;
+import io.delta.kernel.internal.actions.RemoveFile;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.FileStats;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelActionsConverter {
+  private static final DeltaKernelActionsConverter INSTANCE = new 
DeltaKernelActionsConverter();
+
+  public static DeltaKernelActionsConverter getInstance() {
+    return INSTANCE;
+  }
+
+  public InternalDataFile convertAddActionToInternalDataFile(
+      AddFile addFile,
+      Table table,
+      FileFormat fileFormat,
+      List<InternalPartitionField> partitionFields,
+      List<InternalField> fields,
+      boolean includeColumnStats,
+      DeltaKernelPartitionExtractor partitionExtractor,
+      DeltaKernelStatsExtractor fileStatsExtractor,
+      Map<String, String> partitionValues) {
+    FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, 
fields);
+    List<ColumnStat> columnStats =
+        includeColumnStats ? fileStats.getColumnStats() : 
Collections.emptyList();
+    long recordCount = fileStats.getNumRecords();
+    // The immutable map from Java to Scala is not working, need to

Review Comment:
   This comment seems incomplete, is there something we need to fix on the 
scala/java conversion?



##########
xtable-core/src/test/java/org/apache/xtable/DeltaTableKernel.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable;
+
+// import org.junit.jupiter.api.Test;
+//
+import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.delta.kernel.*;
+import io.delta.kernel.data.ColumnVector;
+import io.delta.kernel.data.ColumnarBatch;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.*;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.internal.data.ScanStateRow;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+public class DeltaTableKernel {
+  private static final Logger logger = 
LoggerFactory.getLogger(DeltaTableKernel.class);
+
+  @Test
+  public void readDeltaKernel() throws IOException {
+    String myTablePath =
+        
"/Users/vaibhakumar/Desktop/opensource/iceberg/warehouse/demo/nyc/taxis"; // 
fully qualified

Review Comment:
   This looks like some local testing. Do we still need this? If so, is there a 
way to make it run with a table created in the TempDir?



##########
xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelStatsExtractor.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import static org.apache.xtable.testutil.ColumnStatMapUtil.getColumnStats;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.*;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import io.delta.kernel.expressions.Column;
+import io.delta.kernel.expressions.Literal;
+import io.delta.kernel.types.StringType;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.testutil.ColumnStatMapUtil;
+
+public class TestDeltaKernelStatsExtractor {
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Test
+  public void testDeltaStats() throws JsonProcessingException {
+    InternalSchema schema = ColumnStatMapUtil.getSchema();
+
+    List<ColumnStat> columnStats = getColumnStats();
+
+    String actualStats =
+        
DeltaKernelStatsExtractor.getInstance().convertStatsToDeltaFormat(schema, 50L, 
columnStats);
+    Map<String, Object> actualStatsMap = MAPPER.readValue(actualStats, 
HashMap.class);
+    assertEquals(50, actualStatsMap.get("numRecords"));
+    Map<String, Object> minValueStatsMap =
+        (HashMap<String, Object>) actualStatsMap.get("minValues");
+    assertEquals(10, minValueStatsMap.get("long_field"));
+    assertEquals("a", minValueStatsMap.get("string_field"));
+    assertEquals(null, minValueStatsMap.get("null_string_field"));
+    assertEquals("2022-10-08 21:08:17", 
minValueStatsMap.get("timestamp_field"));
+    assertEquals("2022-10-08 21:08:17", 
minValueStatsMap.get("timestamp_micros_field"));
+    assertEquals(1.23, minValueStatsMap.get("float_field"));
+    assertEquals(1.23, minValueStatsMap.get("double_field"));
+    assertEquals(1.0, minValueStatsMap.get("decimal_field"));
+    // TOD0: Local timestamp depends on env where it is run, it is non 
determinstic and this has to
+    // be computed dynamically.
+    // assertEquals("2022-10-08 14:08:17", 
minValueStatsMap.get("local_timestamp_field"));
+    assertEquals("2019-10-12", minValueStatsMap.get("date_field"));
+    Map<String, Object> nestedMapInMinValueStatsMap =
+        (HashMap<String, Object>) minValueStatsMap.get("nested_struct_field");
+    assertEquals(500, nestedMapInMinValueStatsMap.get("nested_long_field"));
+
+    Map<String, Object> maxValueStatsMap =
+        (HashMap<String, Object>) actualStatsMap.get("maxValues");
+    assertEquals(20, maxValueStatsMap.get("long_field"));
+    assertEquals("c", maxValueStatsMap.get("string_field"));
+    assertEquals(null, maxValueStatsMap.get("null_string_field"));
+    assertEquals("2022-10-10 21:08:17", 
maxValueStatsMap.get("timestamp_field"));
+    assertEquals("2022-10-10 21:08:17", 
maxValueStatsMap.get("timestamp_micros_field"));
+    // TOD0: Local timestamp depends on env where it is run, it is non 
determinstic and this has to
+    // be computed dynamically.
+    // assertEquals("2022-10-10 14:08:17", 
maxValueStatsMap.get("local_timestamp_field"));
+    assertEquals("2020-10-12", maxValueStatsMap.get("date_field"));
+    assertEquals(6.54321, maxValueStatsMap.get("float_field"));
+    assertEquals(6.54321, maxValueStatsMap.get("double_field"));
+    assertEquals(2.0, maxValueStatsMap.get("decimal_field"));
+    Map<String, Object> nestedMapInMaxValueStatsMap =
+        (HashMap<String, Object>) maxValueStatsMap.get("nested_struct_field");
+    assertEquals(600, nestedMapInMaxValueStatsMap.get("nested_long_field"));
+
+    Map<String, Object> nullValueStatsMap =
+        (HashMap<String, Object>) actualStatsMap.get("nullCount");
+    assertEquals(4, nullValueStatsMap.get("long_field"));
+    assertEquals(1, nullValueStatsMap.get("string_field"));
+
+    assertEquals(3, nullValueStatsMap.get("null_string_field"));
+    assertEquals(105, nullValueStatsMap.get("timestamp_field"));
+    assertEquals(1, nullValueStatsMap.get("timestamp_micros_field"));
+    assertEquals(1, nullValueStatsMap.get("local_timestamp_field"));
+    assertEquals(250, nullValueStatsMap.get("date_field"));
+    assertEquals(2, nullValueStatsMap.get("float_field"));
+    assertEquals(3, nullValueStatsMap.get("double_field"));
+    assertEquals(1, nullValueStatsMap.get("decimal_field"));
+    Map<String, Object> nestedMapInNullCountMap =
+        (HashMap<String, Object>) nullValueStatsMap.get("nested_struct_field");
+    assertEquals(4, nestedMapInNullCountMap.get("nested_long_field"));
+  }
+
+  private Map<Column, Literal> parseValues(JsonNode valuesNode) {
+    Map<Column, Literal> values = new HashMap<>();
+    if (valuesNode == null || valuesNode.isNull()) {
+      return values;
+    }
+
+    Iterator<Map.Entry<String, JsonNode>> fields = valuesNode.fields();
+    while (fields.hasNext()) {
+      Map.Entry<String, JsonNode> entry = fields.next();
+      String columnName = entry.getKey();
+      JsonNode valueNode = entry.getValue();
+      values.put(new Column(columnName), convertToLiteral(valueNode));
+    }
+    return values;
+  }
+
+  private Literal convertToLiteral(JsonNode valueNode) {
+    System.out.println("ValueNode: " + valueNode);

Review Comment:
   Remove this println



##########
xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelPartitionExtractor.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import static org.apache.xtable.collectors.CustomCollectors.toList;
+import static 
org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue;
+import static 
org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import scala.collection.JavaConverters;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import io.delta.kernel.types.*;
+import io.delta.kernel.types.FieldMetadata;
+
+import org.apache.xtable.exception.PartitionSpecException;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelPartitionExtractor {
+  private static final DeltaKernelPartitionExtractor INSTANCE = new 
DeltaKernelPartitionExtractor();
+  private static final String CAST_FUNCTION = "CAST(%s as DATE)";
+  private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')";
+  private static final String YEAR_FUNCTION = "YEAR(%s)";
+  private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH";
+  private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
+  private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
+  private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
+  private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
+  // For timestamp partition fields, actual partition column names in delta 
format will be of type
+  // generated & and with a name like 
`delta_partition_col_{transform_type}_{source_field_name}`.
+  private static final String DELTA_PARTITION_COL_NAME_FORMAT = 
"xtable_partition_col_%s_%s";
+  static final String DELTA_GENERATION_EXPRESSION = 
"delta.generationExpression";
+  private static final List<ParsedGeneratedExpr.GeneratedExprType> 
GRANULARITIES =
+      Arrays.asList(
+          ParsedGeneratedExpr.GeneratedExprType.YEAR,
+          ParsedGeneratedExpr.GeneratedExprType.MONTH,
+          ParsedGeneratedExpr.GeneratedExprType.DAY,
+          ParsedGeneratedExpr.GeneratedExprType.HOUR);
+
+  public static DeltaKernelPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Extracts partition fields from delta table. Partitioning by nested 
columns isn't supported.
+   * Example: Given a delta table and a reference to DeltaLog, method 
parameters can be obtained by
+   * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema 
internalSchema =
+   * 
DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema());
 StructType
+   * partitionSchema = deltaLog.metadata().partitionSchema();
+   *
+   * @param internalSchema canonical representation of the schema.
+   * @param partitionSchema partition schema of the delta table.
+   * @return list of canonical representation of the partition fields
+   */
+  public List<InternalPartitionField> convertFromDeltaPartitionFormat(
+      InternalSchema internalSchema, StructType partitionSchema) {
+    if (partitionSchema.fields().size() == 0) {
+      return Collections.emptyList();
+    }
+    return getInternalPartitionFields(partitionSchema, internalSchema);
+  }
+
+  /**
+   * If all of them are value process individually and return. If they contain 
month they should
+   * contain year as well. If they contain day they should contain month and 
year as well. If they
+   * contain hour they should contain day, month and year as well. Other 
supports CAST(col as DATE)
+   * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be 
fully supported.
+   */
+  private List<InternalPartitionField> getInternalPartitionFields(
+      StructType partitionSchema, InternalSchema internalSchema) {
+    PeekingIterator<StructField> itr =
+        Iterators.peekingIterator(partitionSchema.fields().iterator());
+    List<InternalPartitionField> partitionFields = new 
ArrayList<>(partitionSchema.fields().size());
+    while (itr.hasNext()) {
+      StructField currPartitionField = itr.peek();
+      if 
(!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+        partitionFields.add(
+            InternalPartitionField.builder()
+                .sourceField(
+                    SchemaFieldFinder.getInstance()
+                        .findFieldByPath(internalSchema, 
currPartitionField.getName()))
+                .transformType(PartitionTransformType.VALUE)
+                .build());
+        itr.next(); // consume the field.
+      } else {
+        // Partition contains generated expression.
+        // if it starts with year we should consume until we hit field with no 
generated expression
+        // or we hit a field with generated expression that is of cast or date 
format.
+        String expr = 
currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+        ParsedGeneratedExpr parsedGeneratedExpr =
+            ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), 
expr);
+        if (ParsedGeneratedExpr.GeneratedExprType.CAST == 
parsedGeneratedExpr.generatedExprType) {
+          partitionFields.add(
+              getPartitionWithDateTransform(
+                  currPartitionField.getName(), parsedGeneratedExpr, 
internalSchema));
+          itr.next(); // consume the field.
+        } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+            == parsedGeneratedExpr.generatedExprType) {
+          partitionFields.add(
+              getPartitionWithDateFormatTransform(
+                  currPartitionField.getName(), parsedGeneratedExpr, 
internalSchema));
+          itr.next(); // consume the field.
+        } else {
+          // consume until we hit field with no generated expression or 
generated expression
+          // that is not of type cast or date format.
+          List<ParsedGeneratedExpr> parsedGeneratedExprs = new ArrayList<>();
+          while (itr.hasNext()
+              && 
currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+            expr = 
currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+            parsedGeneratedExpr =
+                
ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr);
+
+            if (ParsedGeneratedExpr.GeneratedExprType.CAST == 
parsedGeneratedExpr.generatedExprType
+                || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+                    == parsedGeneratedExpr.generatedExprType) {
+              break;
+            }
+            parsedGeneratedExprs.add(parsedGeneratedExpr);
+            itr.next(); // consume the field
+            if (itr.hasNext()) {
+              currPartitionField = itr.peek();
+            }
+          }
+          partitionFields.add(
+              
getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, 
internalSchema));
+        }
+      }
+    }
+    return partitionFields;
+  }
+
+  private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear(
+      List<ParsedGeneratedExpr> parsedGeneratedExprs, InternalSchema 
internalSchema) {
+    if (parsedGeneratedExprs.size() > 4) {
+      throw new IllegalStateException("Invalid partition transform");
+    }
+    validate(
+        parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, 
parsedGeneratedExprs.size())));
+
+    ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0);
+    List<String> partitionColumns =
+        parsedGeneratedExprs.stream()
+            .map(parsedGeneratedExpr -> 
parsedGeneratedExpr.partitionColumnName)
+            .collect(toList(parsedGeneratedExprs.size()));
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, 
transform.sourceColumn))
+        .partitionFieldNames(partitionColumns)
+        .transformType(
+            parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1)
+                .internalPartitionTransformType)
+        .build();
+  }
+
+  // Cast has default format of yyyy-MM-dd.
+  private InternalPartitionField getPartitionWithDateTransform(
+      String partitionColumnName,
+      ParsedGeneratedExpr parsedGeneratedExpr,
+      InternalSchema internalSchema) {
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance()
+                .findFieldByPath(internalSchema, 
parsedGeneratedExpr.sourceColumn))
+        .partitionFieldNames(Collections.singletonList(partitionColumnName))
+        .transformType(PartitionTransformType.DAY)
+        .build();
+  }
+
+  private InternalPartitionField getPartitionWithDateFormatTransform(
+      String partitionColumnName,
+      ParsedGeneratedExpr parsedGeneratedExpr,
+      InternalSchema internalSchema) {
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance()
+                .findFieldByPath(internalSchema, 
parsedGeneratedExpr.sourceColumn))
+        .partitionFieldNames(Collections.singletonList(partitionColumnName))
+        .transformType(parsedGeneratedExpr.internalPartitionTransformType)
+        .build();
+  }
+
+  public Map<String, StructField> convertToDeltaPartitionFormat(
+      List<InternalPartitionField> partitionFields) {
+    if (partitionFields == null) {
+      return null;
+    }
+    Map<String, StructField> nameToStructFieldMap = new HashMap<>();
+    for (InternalPartitionField internalPartitionField : partitionFields) {
+      String currPartitionColumnName;
+      StructField field;
+
+      if (internalPartitionField.getTransformType() == 
PartitionTransformType.VALUE) {
+        System.out.println("if coming");

Review Comment:
   Remove the println here and below



##########
xtable-service/pom.xml:
##########
@@ -60,7 +60,10 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-aws</artifactId>
         </dependency>
-
+        <dependency>

Review Comment:
   Why is this required as part of this change?



##########
xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelPartitionExtractor.java:
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import static org.apache.xtable.collectors.CustomCollectors.toList;
+import static 
org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue;
+import static 
org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import scala.collection.JavaConverters;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import io.delta.kernel.types.*;
+import io.delta.kernel.types.FieldMetadata;
+
+import org.apache.xtable.exception.PartitionSpecException;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.schema.SchemaFieldFinder;
+
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class DeltaKernelPartitionExtractor {
+  private static final DeltaKernelPartitionExtractor INSTANCE = new 
DeltaKernelPartitionExtractor();
+  private static final String CAST_FUNCTION = "CAST(%s as DATE)";
+  private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')";
+  private static final String YEAR_FUNCTION = "YEAR(%s)";
+  private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH";
+  private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd";
+  private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM";
+  private static final String DATE_FORMAT_FOR_YEAR = "yyyy";
+  private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)";
+  // For timestamp partition fields, actual partition column names in delta 
format will be of type
+  // generated & and with a name like 
`delta_partition_col_{transform_type}_{source_field_name}`.
+  private static final String DELTA_PARTITION_COL_NAME_FORMAT = 
"xtable_partition_col_%s_%s";
+  static final String DELTA_GENERATION_EXPRESSION = 
"delta.generationExpression";
+  private static final List<ParsedGeneratedExpr.GeneratedExprType> 
GRANULARITIES =
+      Arrays.asList(
+          ParsedGeneratedExpr.GeneratedExprType.YEAR,
+          ParsedGeneratedExpr.GeneratedExprType.MONTH,
+          ParsedGeneratedExpr.GeneratedExprType.DAY,
+          ParsedGeneratedExpr.GeneratedExprType.HOUR);
+
+  public static DeltaKernelPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Extracts partition fields from delta table. Partitioning by nested 
columns isn't supported.
+   * Example: Given a delta table and a reference to DeltaLog, method 
parameters can be obtained by
+   * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema 
internalSchema =
+   * 
DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema());
 StructType
+   * partitionSchema = deltaLog.metadata().partitionSchema();
+   *
+   * @param internalSchema canonical representation of the schema.
+   * @param partitionSchema partition schema of the delta table.
+   * @return list of canonical representation of the partition fields
+   */
+  public List<InternalPartitionField> convertFromDeltaPartitionFormat(
+      InternalSchema internalSchema, StructType partitionSchema) {
+    if (partitionSchema.fields().size() == 0) {
+      return Collections.emptyList();
+    }
+    return getInternalPartitionFields(partitionSchema, internalSchema);
+  }
+
+  /**
+   * If all of them are value process individually and return. If they contain 
month they should
+   * contain year as well. If they contain day they should contain month and 
year as well. If they
+   * contain hour they should contain day, month and year as well. Other 
supports CAST(col as DATE)
+   * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be 
fully supported.
+   */
+  private List<InternalPartitionField> getInternalPartitionFields(
+      StructType partitionSchema, InternalSchema internalSchema) {
+    PeekingIterator<StructField> itr =
+        Iterators.peekingIterator(partitionSchema.fields().iterator());
+    List<InternalPartitionField> partitionFields = new 
ArrayList<>(partitionSchema.fields().size());
+    while (itr.hasNext()) {
+      StructField currPartitionField = itr.peek();
+      if 
(!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+        partitionFields.add(
+            InternalPartitionField.builder()
+                .sourceField(
+                    SchemaFieldFinder.getInstance()
+                        .findFieldByPath(internalSchema, 
currPartitionField.getName()))
+                .transformType(PartitionTransformType.VALUE)
+                .build());
+        itr.next(); // consume the field.
+      } else {
+        // Partition contains generated expression.
+        // if it starts with year we should consume until we hit field with no 
generated expression
+        // or we hit a field with generated expression that is of cast or date 
format.
+        String expr = 
currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+        ParsedGeneratedExpr parsedGeneratedExpr =
+            ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), 
expr);
+        if (ParsedGeneratedExpr.GeneratedExprType.CAST == 
parsedGeneratedExpr.generatedExprType) {
+          partitionFields.add(
+              getPartitionWithDateTransform(
+                  currPartitionField.getName(), parsedGeneratedExpr, 
internalSchema));
+          itr.next(); // consume the field.
+        } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+            == parsedGeneratedExpr.generatedExprType) {
+          partitionFields.add(
+              getPartitionWithDateFormatTransform(
+                  currPartitionField.getName(), parsedGeneratedExpr, 
internalSchema));
+          itr.next(); // consume the field.
+        } else {
+          // consume until we hit field with no generated expression or 
generated expression
+          // that is not of type cast or date format.
+          List<ParsedGeneratedExpr> parsedGeneratedExprs = new ArrayList<>();
+          while (itr.hasNext()
+              && 
currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) {
+            expr = 
currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION);
+            parsedGeneratedExpr =
+                
ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr);
+
+            if (ParsedGeneratedExpr.GeneratedExprType.CAST == 
parsedGeneratedExpr.generatedExprType
+                || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT
+                    == parsedGeneratedExpr.generatedExprType) {
+              break;
+            }
+            parsedGeneratedExprs.add(parsedGeneratedExpr);
+            itr.next(); // consume the field
+            if (itr.hasNext()) {
+              currPartitionField = itr.peek();
+            }
+          }
+          partitionFields.add(
+              
getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, 
internalSchema));
+        }
+      }
+    }
+    return partitionFields;
+  }
+
+  private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear(
+      List<ParsedGeneratedExpr> parsedGeneratedExprs, InternalSchema 
internalSchema) {
+    if (parsedGeneratedExprs.size() > 4) {
+      throw new IllegalStateException("Invalid partition transform");
+    }
+    validate(
+        parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, 
parsedGeneratedExprs.size())));
+
+    ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0);
+    List<String> partitionColumns =
+        parsedGeneratedExprs.stream()
+            .map(parsedGeneratedExpr -> 
parsedGeneratedExpr.partitionColumnName)
+            .collect(toList(parsedGeneratedExprs.size()));
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, 
transform.sourceColumn))
+        .partitionFieldNames(partitionColumns)
+        .transformType(
+            parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1)
+                .internalPartitionTransformType)
+        .build();
+  }
+
+  // Cast has default format of yyyy-MM-dd.
+  private InternalPartitionField getPartitionWithDateTransform(
+      String partitionColumnName,
+      ParsedGeneratedExpr parsedGeneratedExpr,
+      InternalSchema internalSchema) {
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance()
+                .findFieldByPath(internalSchema, 
parsedGeneratedExpr.sourceColumn))
+        .partitionFieldNames(Collections.singletonList(partitionColumnName))
+        .transformType(PartitionTransformType.DAY)
+        .build();
+  }
+
+  private InternalPartitionField getPartitionWithDateFormatTransform(
+      String partitionColumnName,
+      ParsedGeneratedExpr parsedGeneratedExpr,
+      InternalSchema internalSchema) {
+    return InternalPartitionField.builder()
+        .sourceField(
+            SchemaFieldFinder.getInstance()
+                .findFieldByPath(internalSchema, 
parsedGeneratedExpr.sourceColumn))
+        .partitionFieldNames(Collections.singletonList(partitionColumnName))
+        .transformType(parsedGeneratedExpr.internalPartitionTransformType)
+        .build();
+  }
+
+  public Map<String, StructField> convertToDeltaPartitionFormat(
+      List<InternalPartitionField> partitionFields) {
+    if (partitionFields == null) {
+      return null;
+    }
+    Map<String, StructField> nameToStructFieldMap = new HashMap<>();
+    for (InternalPartitionField internalPartitionField : partitionFields) {
+      String currPartitionColumnName;
+      StructField field;
+
+      if (internalPartitionField.getTransformType() == 
PartitionTransformType.VALUE) {
+        System.out.println("if coming");
+        currPartitionColumnName = 
internalPartitionField.getSourceField().getName();
+        field = null;
+      } else {
+        // Since partition field of timestamp or bucket type, create new field 
in schema.
+        System.out.println("else coming");
+        field = getGeneratedField(internalPartitionField);
+        currPartitionColumnName = field.getName();
+      }
+      nameToStructFieldMap.put(currPartitionColumnName, field);
+    }
+    return nameToStructFieldMap;
+  }
+
+  public Map<String, String> partitionValueSerialization(InternalDataFile 
internalDataFile) {
+    Map<String, String> partitionValuesSerialized = new HashMap<>();
+    if (internalDataFile.getPartitionValues() == null
+        || internalDataFile.getPartitionValues().isEmpty()) {
+      return partitionValuesSerialized;
+    }
+    for (PartitionValue partitionValue : 
internalDataFile.getPartitionValues()) {
+      InternalPartitionField partitionField = 
partitionValue.getPartitionField();
+      PartitionTransformType transformType = partitionField.getTransformType();
+      String partitionValueSerialized;
+      if (transformType == PartitionTransformType.VALUE) {
+        partitionValueSerialized =
+            convertToDeltaPartitionValue(
+                partitionValue.getRange().getMaxValue(),
+                partitionField.getSourceField().getSchema().getDataType(),
+                transformType,
+                "");
+        partitionValuesSerialized.put(
+            partitionField.getSourceField().getName(), 
partitionValueSerialized);
+      } else if (transformType == PartitionTransformType.BUCKET) {
+        partitionValueSerialized = 
partitionValue.getRange().getMaxValue().toString();
+        partitionValuesSerialized.put(
+            getGeneratedColumnName(partitionField), partitionValueSerialized);
+      } else {
+        // use appropriate date formatter for value serialization.
+        partitionValueSerialized =
+            convertToDeltaPartitionValue(
+                partitionValue.getRange().getMaxValue(),
+                partitionField.getSourceField().getSchema().getDataType(),
+                transformType,
+                getDateFormat(partitionField.getTransformType()));
+        partitionValuesSerialized.put(
+            getGeneratedColumnName(partitionField), partitionValueSerialized);
+      }
+    }
+    return partitionValuesSerialized;
+  }
+
+  public List<PartitionValue> partitionValueExtraction(
+      scala.collection.Map<String, String> values, 
List<InternalPartitionField> partitionFields) {

Review Comment:
   Can we just use a java map here?



##########
xtable-core/src/test/java/org/apache/xtable/kernel/ITDeltaKernelConversionSource.java:
##########
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.serializer.KryoSerializer;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestSparkDeltaTable;
+import org.apache.xtable.ValidationTestHelper;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class ITDeltaKernelConversionSource {
+  private static final InternalField COL1_INT_FIELD =
+      InternalField.builder()
+          .name("col1")
+          .schema(
+              InternalSchema.builder()
+                  .name("integer")
+                  .dataType(InternalType.INT)
+                  .isNullable(true)
+                  .build())
+          .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+          .build();
+
+  private static final InternalField COL2_INT_FIELD =
+      InternalField.builder()
+          .name("col2")
+          .schema(
+              InternalSchema.builder()
+                  .name("integer")
+                  .dataType(InternalType.INT)
+                  .isNullable(true)
+                  .build())
+          .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+          .build();
+
+  private static final InternalField COL3_STR_FIELD =
+      InternalField.builder()
+          .name("col3")
+          .schema(
+              InternalSchema.builder()
+                  .name("integer")
+                  .dataType(InternalType.INT)
+                  .isNullable(true)
+                  .build())
+          .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+          .build();
+  private static final ColumnStat COL2_COLUMN_STAT =
+      ColumnStat.builder()
+          .field(COL2_INT_FIELD)
+          .range(Range.vector(2, 2))
+          .numNulls(0)
+          .numValues(1)
+          .totalSize(0)
+          .build();
+  private static final ColumnStat COL1_COLUMN_STAT =
+      ColumnStat.builder()
+          .field(COL1_INT_FIELD)
+          .range(Range.vector(1, 1))
+          .numNulls(0)
+          .numValues(1)
+          .totalSize(0)
+          .build();
+
+  private DeltaKernelConversionSourceProvider conversionSourceProvider;
+  private static SparkSession sparkSession;
+
+  @BeforeAll
+  public static void setupOnce() {
+    sparkSession =
+        SparkSession.builder()
+            .appName("TestDeltaTable")
+            .master("local[4]")
+            .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+            .config(
+                "spark.sql.catalog.spark_catalog",
+                "org.apache.spark.sql.delta.catalog.DeltaCatalog")
+            .config("spark.databricks.delta.retentionDurationCheck.enabled", 
"false")
+            .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
+            .config("spark.sql.shuffle.partitions", "1")
+            .config("spark.default.parallelism", "1")
+            .config("spark.serializer", KryoSerializer.class.getName())
+            .getOrCreate();
+  }
+
+  @TempDir private static Path tempDir;
+
+  @AfterAll
+  public static void tearDownSparkSession() {
+    if (sparkSession != null) {
+      sparkSession.catalog().clearCache();
+      sparkSession.stop();
+    }
+  }
+
+  @BeforeEach
+  void setUp() {
+    Configuration hadoopConf = new Configuration();
+    hadoopConf.set("fs.defaultFS", "file:///");
+
+    conversionSourceProvider = new DeltaKernelConversionSourceProvider();
+    conversionSourceProvider.init(hadoopConf);
+  }
+
+  @Test
+  void getCurrentSnapshotNonPartitionedTest() throws URISyntaxException {
+    // Table name
+    final String tableName = GenericTable.getTableName();
+    final Path basePath = tempDir.resolve(tableName);
+    // Create table with a single row using Spark
+    sparkSession.sql(
+        "CREATE TABLE `"
+            + tableName
+            + "` USING DELTA LOCATION '"
+            + basePath
+            + "' AS SELECT * FROM VALUES (1, 2)");
+    // Create Delta source
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaKernelConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    // Get current snapshot
+    InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
+    // Validate table
+    List<InternalField> fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD);
+    validateTable(
+        snapshot.getTable(),
+        tableName,
+        TableFormat.DELTA,
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .fields(fields)
+            .build(),
+        DataLayoutStrategy.FLAT,
+        "file://" + basePath,
+        snapshot.getTable().getLatestMetadataPath(),
+        Collections.emptyList());
+    // Validate data files
+    List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT, 
COL2_COLUMN_STAT);
+    Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
+
+    validatePartitionDataFiles(
+        PartitionFileGroup.builder()
+            .files(
+                Collections.singletonList(
+                    InternalDataFile.builder()
+                        .physicalPath("file:/fake/path")
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .partitionValues(Collections.emptyList())
+                        .fileSizeBytes(716)
+                        .recordCount(1)
+                        .columnStats(columnStats)
+                        .build()))
+            .partitionValues(Collections.emptyList())
+            .build(),
+        snapshot.getPartitionedDataFiles().get(0));
+  }
+
+  @Test
+  void getCurrentTableTest() {
+    // Table name
+    final String tableName = GenericTable.getTableName();
+    final Path basePath = tempDir.resolve(tableName);
+    ;
+    // Create table with a single row using Spark
+    sparkSession.sql(
+        "CREATE TABLE `"
+            + tableName
+            + "` USING DELTA LOCATION '"
+            + basePath
+            + "' AS SELECT * FROM VALUES (1, 2, 3)");
+    // Create Delta source
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaKernelConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    // Get current table
+    InternalTable internalTable = conversionSource.getCurrentTable();
+    List<InternalField> fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD, 
COL3_STR_FIELD);
+    validateTable(
+        internalTable,
+        tableName,
+        TableFormat.DELTA,
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .fields(fields)
+            .build(),
+        DataLayoutStrategy.FLAT,
+        "file://" + basePath,
+        internalTable.getLatestMetadataPath(),
+        Collections.emptyList());
+  }
+
+  @Test
+  void getCurrentSnapshotPartitionedTest() throws URISyntaxException {
+    // Table name
+    final String tableName = GenericTable.getTableName();
+    final Path basePath = tempDir.resolve(tableName);
+    // Create table with a single row using Spark
+    sparkSession.sql(
+        "CREATE TABLE `"
+            + tableName
+            + "` USING DELTA PARTITIONED BY (part_col)\n"
+            + "LOCATION '"
+            + basePath
+            + "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2");
+    // Create Delta source
+    SourceTable tableConfig =
+        SourceTable.builder()
+            .name(tableName)
+            .basePath(basePath.toString())
+            .formatName(TableFormat.DELTA)
+            .build();
+    DeltaKernelConversionSource conversionSource =
+        conversionSourceProvider.getConversionSourceInstance(tableConfig);
+    // Get current snapshot
+    InternalSnapshot snapshot = conversionSource.getCurrentSnapshot();
+    // Validate table
+    InternalField partCol =
+        InternalField.builder()
+            .name("part_col")
+            .schema(
+                InternalSchema.builder()
+                    .name("string")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    List<InternalField> fields = Arrays.asList(partCol, COL1_INT_FIELD, 
COL2_INT_FIELD);
+    validateTable(
+        snapshot.getTable(),
+        tableName,
+        TableFormat.DELTA,
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .fields(fields)
+            .build(),
+        DataLayoutStrategy.HIVE_STYLE_PARTITION,
+        "file://" + basePath,
+        snapshot.getTable().getLatestMetadataPath(),
+        Collections.singletonList(
+            InternalPartitionField.builder()
+                .sourceField(partCol)
+                .transformType(PartitionTransformType.VALUE)
+                .build()));
+    // Validate data files
+    List<ColumnStat> columnStats = Arrays.asList(COL1_COLUMN_STAT, 
COL2_COLUMN_STAT);
+    Assertions.assertEquals(1, snapshot.getPartitionedDataFiles().size());
+    List<PartitionValue> partitionValue =
+        Collections.singletonList(
+            PartitionValue.builder()
+                .partitionField(
+                    InternalPartitionField.builder()
+                        .sourceField(partCol)
+                        .transformType(PartitionTransformType.VALUE)
+                        .build())
+                .range(Range.scalar("SingleValue"))
+                .build());
+    validatePartitionDataFiles(
+        PartitionFileGroup.builder()
+            .partitionValues(partitionValue)
+            .files(
+                Collections.singletonList(
+                    InternalDataFile.builder()
+                        .physicalPath("file:/fake/path")
+                        .fileFormat(FileFormat.APACHE_PARQUET)
+                        .partitionValues(partitionValue)
+                        .fileSizeBytes(716)
+                        .recordCount(1)
+                        .columnStats(columnStats)
+                        .build()))
+            .build(),
+        snapshot.getPartitionedDataFiles().get(0));
+  }
+
+  @ParameterizedTest
+  @MethodSource("testWithPartitionToggle")
+  public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
+    String tableName = GenericTable.getTableName();
+    TestSparkDeltaTable testSparkDeltaTable =
+        new TestSparkDeltaTable(
+            tableName, tempDir, sparkSession, isPartitioned ? "yearOfBirth" : 
null, false);
+    //    System.out.println("testSparkDeltaTable" + 
testSparkDeltaTable.getColumnsToSelect());

Review Comment:
   Remove this?



##########
xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java:
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.delta.kernel.types.*;
+import io.delta.kernel.types.FieldMetadata;
+import io.delta.kernel.types.StructType;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestDeltaKernelSchemaExtractor {
+  @Test
+  public void testPrimitiveTypes() {
+    Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
+
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name("struct")
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("requiredBoolean")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("boolean")
+                                .dataType(InternalType.BOOLEAN)
+                                .isNullable(false)
+                                .comment("requiredBooleanComment")
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalBoolean")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("boolean")
+                                .dataType(InternalType.BOOLEAN)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredInt")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("integer")
+                                .dataType(InternalType.INT)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalInt")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("integer")
+                                .dataType(InternalType.INT)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredLong")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("long")
+                                .dataType(InternalType.LONG)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalLong")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("long")
+                                .dataType(InternalType.LONG)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredDouble")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("double")
+                                .dataType(InternalType.DOUBLE)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalDouble")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("double")
+                                .dataType(InternalType.DOUBLE)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredFloat")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("float")
+                                .dataType(InternalType.FLOAT)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalFloat")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("float")
+                                .dataType(InternalType.FLOAT)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredString")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("string")
+                                .dataType(InternalType.STRING)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalString")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("string")
+                                .dataType(InternalType.STRING)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredBytes")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("binary")
+                                .dataType(InternalType.BYTES)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalBytes")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("binary")
+                                .dataType(InternalType.BYTES)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredDate")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("date")
+                                .dataType(InternalType.DATE)
+                                .isNullable(false)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalDate")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("date")
+                                .dataType(InternalType.DATE)
+                                .isNullable(true)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredDecimal")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("decimal")
+                                .dataType(InternalType.DECIMAL)
+                                .isNullable(false)
+                                .metadata(decimalMetadata)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalDecimal")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("decimal")
+                                .dataType(InternalType.DECIMAL)
+                                .isNullable(true)
+                                .metadata(decimalMetadata)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build()))
+            .build();
+    io.delta.kernel.types.StructType structRepresentation =

Review Comment:
   Let's import the class instead of relying on the FQDN 



##########
pom.xml:
##########
@@ -724,7 +725,7 @@
                 </executions>
                 <configuration>
                     <skip>${skipUTs}</skip>
-                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>

Review Comment:
   Let's keep this as `true` for now?



##########
xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelPartitionExtractor.java:
##########
@@ -0,0 +1,560 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.kernel;
+
+import static 
org.apache.xtable.kernel.DeltaKernelPartitionExtractor.DELTA_GENERATION_EXPRESSION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+
+import scala.collection.JavaConverters;
+
+import io.delta.kernel.types.*;
+import io.delta.kernel.types.FieldMetadata;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+
+import org.apache.xtable.model.schema.*;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+public class TestDeltaKernelPartitionExtractor {
+  private static final Map<String, StructField> STRUCT_FIELD_MAP =
+      new HashMap<String, StructField>() {
+        {
+          put("id", new StructField("id", IntegerType.INTEGER, false));
+          put("firstName", new StructField("firstName", StringType.STRING, 
false));
+          put("gender", new StructField("gender", StringType.STRING, false));
+          put("birthDate", new StructField("birthDate", 
TimestampType.TIMESTAMP, false));
+          put(
+              "dateOfBirth",
+              new StructField(
+                  "dateOfBirth",
+                  DateType.DATE,
+                  false,
+                  FieldMetadata.builder()
+                      .putString("delta.generationExpression", "CAST(birthDate 
AS DATE)")
+                      .build()));
+
+          put(
+              "dateFmt",
+              new StructField(
+                  "dateFmt",
+                  StringType.STRING,
+                  false,
+                  FieldMetadata.builder()
+                      .putString(
+                          "delta.generationExpression", 
"DATE_FORMAT(birthDate, 'yyyy-MM-dd-HH')")
+                      .build()));
+
+          put(
+              "yearOfBirth",
+              new StructField(
+                  "yearOfBirth",
+                  IntegerType.INTEGER,
+                  false,
+                  FieldMetadata.builder()
+                      .putString("delta.generationExpression", 
"YEAR(birthDate)")
+                      .build()));
+          put(
+              "monthOfBirth",
+              new StructField(
+                  "monthOfBirth",
+                  IntegerType.INTEGER,
+                  false,
+                  FieldMetadata.builder()
+                      .putString("delta.generationExpression", 
"MONTH(birthDate)")
+                      .build()));
+
+          put(
+              "dayOfBirth",
+              new StructField(
+                  "dayOfBirth",
+                  IntegerType.INTEGER,
+                  false,
+                  FieldMetadata.builder()
+                      .putString("delta.generationExpression", 
"DAY(birthDate)")
+                      .build()));
+
+          put(
+              "hourOfBirth",
+              new StructField(
+                  "hourOfBirth",
+                  IntegerType.INTEGER,
+                  false,
+                  FieldMetadata.builder()
+                      .putString("delta.generationExpression", 
"HOUR(birthDate)")
+                      .build()));
+        }
+      };
+  private static final InternalSchema TIMESTAMP_SCHEMA =
+      InternalSchema.builder()
+          .name("timestamp")
+          .dataType(InternalType.TIMESTAMP)
+          .metadata(
+              Collections.singletonMap(
+                  InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+                  InternalSchema.MetadataValue.MICROS))
+          .build();
+  private final DeltaKernelPartitionExtractor deltaKernelPartitionExtractor =
+      DeltaKernelPartitionExtractor.getInstance();
+  private final DeltaKernelSchemaExtractor deltaKernelSchemaExtractor =
+      DeltaKernelSchemaExtractor.getInstance();
+
+  @Test
+  public void testUnpartitionedTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, new StructType());
+    assertTrue(internalPartitionFields.isEmpty());
+  }
+
+  @Test
+  public void testSimplePartitionedTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate"));
+    StructType partitionSchema = getSchemaWithFields(Arrays.asList("gender"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    InternalField.builder()
+                        .name("gender")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("string")
+                                .dataType(InternalType.STRING)
+                                .build())
+                        .build())
+                .transformType(PartitionTransformType.VALUE)
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void testDatePartitionedGeneratedColumnsTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate", "dateOfBirth"));
+    StructType partitionSchema = 
getSchemaWithFields(Arrays.asList("dateOfBirth"));
+
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.DAY)
+                .partitionFieldNames(Collections.singletonList("dateOfBirth"))
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void testDateFormatPartitionedGeneratedColumnsTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate", "dateFmt"));
+    StructType partitionSchema = getSchemaWithFields(Arrays.asList("dateFmt"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.HOUR)
+                .partitionFieldNames(Collections.singletonList("dateFmt"))
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void yearPartitionedGeneratedColumnsTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate", "yearOfBirth"));
+    StructType partitionSchema = 
getSchemaWithFields(Arrays.asList("yearOfBirth"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.YEAR)
+                .partitionFieldNames(Collections.singletonList("yearOfBirth"))
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void yearAndSimpleCombinedPartitionedGeneratedColumnsTable() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate", "yearOfBirth"));
+    StructType partitionSchema = 
getSchemaWithFields(Arrays.asList("yearOfBirth", "id"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.YEAR)
+                .partitionFieldNames(Collections.singletonList("yearOfBirth"))
+                .build(),
+            InternalPartitionField.builder()
+                .sourceField(
+                    InternalField.builder()
+                        .name("id")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("integer")
+                                .dataType(InternalType.INT)
+                                .build())
+                        .build())
+                .transformType(PartitionTransformType.VALUE)
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void yearMonthDayHourPartitionedGeneratedColumnsTable() {
+    StructType tableSchema =
+        getSchemaWithFields(
+            Arrays.asList(
+                "id",
+                "firstName",
+                "gender",
+                "birthDate",
+                "yearOfBirth",
+                "monthOfBirth",
+                "dayOfBirth",
+                "hourOfBirth"));
+    StructType partitionSchema =
+        getSchemaWithFields(
+            Arrays.asList("yearOfBirth", "monthOfBirth", "dayOfBirth", 
"hourOfBirth"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .partitionFieldNames(
+                    Arrays.asList("yearOfBirth", "monthOfBirth", "dayOfBirth", 
"hourOfBirth"))
+                .transformType(PartitionTransformType.HOUR)
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  // Test for preserving order of partition columns.
+  @Test
+  public void testCombinationOfPlainAndGeneratedColumns() {
+    StructType tableSchema =
+        getSchemaWithFields(Arrays.asList("id", "firstName", "gender", 
"birthDate", "dateFmt"));
+    StructType partitionSchema =
+        getSchemaWithFields(Arrays.asList("id", "dateFmt", "gender", 
"dateOfBirth"));
+    InternalSchema internalSchema = 
deltaKernelSchemaExtractor.toInternalSchema(tableSchema);
+    List<InternalPartitionField> expectedInternalPartitionFields =
+        Arrays.asList(
+            InternalPartitionField.builder()
+                .sourceField(
+                    InternalField.builder()
+                        .name("id")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("integer")
+                                .dataType(InternalType.INT)
+                                .build())
+                        .build())
+                .transformType(PartitionTransformType.VALUE)
+                .build(),
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.HOUR)
+                .partitionFieldNames(Collections.singletonList("dateFmt"))
+                .build(),
+            InternalPartitionField.builder()
+                .sourceField(
+                    InternalField.builder()
+                        .name("gender")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("string")
+                                .dataType(InternalType.STRING)
+                                .build())
+                        .build())
+                .transformType(PartitionTransformType.VALUE)
+                .build(),
+            InternalPartitionField.builder()
+                .sourceField(
+                    
InternalField.builder().name("birthDate").schema(TIMESTAMP_SCHEMA).build())
+                .transformType(PartitionTransformType.DAY)
+                .partitionFieldNames(Collections.singletonList("dateOfBirth"))
+                .build());
+    List<InternalPartitionField> internalPartitionFields =
+        deltaKernelPartitionExtractor.convertFromDeltaPartitionFormat(
+            internalSchema, partitionSchema);
+    assertEquals(expectedInternalPartitionFields, internalPartitionFields);
+  }
+
+  @Test
+  public void testDateFormatGeneratedPartitionValueExtraction() {
+    // date_partition_column is generated in the table as 
DATE_FORMAT(some_date_column,
+    // 'yyyy-MM-dd-HH')
+    // where some_date_column is of timestamp type.
+    Map<String, String> partitionValuesMap =
+        new HashMap<String, String>() {
+          {
+            put("partition_column1", "partition_value1");
+            put("date_partition_column", "2013-08-20-10");
+          }
+        };
+    scala.collection.mutable.Map<String, String> scalaMap =
+        convertJavaMapToScalaMap(partitionValuesMap);
+    InternalPartitionField internalPartitionField1 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column1")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.VALUE)
+            .build();
+    InternalPartitionField internalPartitionField2 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("some_date_column")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("timestamp")
+                            .dataType(InternalType.TIMESTAMP)
+                            .build())
+                    .build())
+            
.partitionFieldNames(Collections.singletonList("date_partition_column"))
+            .transformType(PartitionTransformType.HOUR)
+            .build();
+    Range rangeForPartitionField1 = Range.scalar("partition_value1");
+    Range rangeForPartitionField2 = Range.scalar(1376992800000L);
+    List<PartitionValue> expectedPartitionValues =
+        Arrays.asList(
+            PartitionValue.builder()
+                .partitionField(internalPartitionField1)
+                .range(rangeForPartitionField1)
+                .build(),
+            PartitionValue.builder()
+                .partitionField(internalPartitionField2)
+                .range(rangeForPartitionField2)
+                .build());
+    List<PartitionValue> partitionValues =
+        deltaKernelPartitionExtractor.partitionValueExtraction(
+            scalaMap, Arrays.asList(internalPartitionField1, 
internalPartitionField2));
+    assertEquals(expectedPartitionValues, partitionValues);
+  }
+
+  @Test
+  public void testSimplePartitionValueExtraction() {
+    Map<String, String> partitionValuesMap =
+        new HashMap<String, String>() {
+          {
+            put("partition_column1", "partition_value1");
+            put("partition_column2", "partition_value2");
+          }
+        };
+    scala.collection.mutable.Map<String, String> scalaMap =
+        convertJavaMapToScalaMap(partitionValuesMap);
+    InternalPartitionField internalPartitionField1 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column1")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.VALUE)
+            .build();
+    InternalPartitionField internalPartitionField2 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column2")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.VALUE)
+            .build();
+    Range rangeForPartitionField1 = Range.scalar("partition_value1");
+    Range rangeForPartitionField2 = Range.scalar("partition_value2");
+    List<PartitionValue> expectedPartitionValues =
+        Arrays.asList(
+            PartitionValue.builder()
+                .partitionField(internalPartitionField1)
+                .range(rangeForPartitionField1)
+                .build(),
+            PartitionValue.builder()
+                .partitionField(internalPartitionField2)
+                .range(rangeForPartitionField2)
+                .build());
+    List<PartitionValue> partitionValues =
+        deltaKernelPartitionExtractor.partitionValueExtraction(
+            scalaMap, Arrays.asList(internalPartitionField1, 
internalPartitionField2));
+    assertEquals(expectedPartitionValues, partitionValues);
+  }
+
+  @Test
+  public void testYearMonthDayHourGeneratedPartitionValueExtraction() {
+    // year, month and day are generated in the table as based on 
some_date_column which is of
+    // timestamp type.
+    Map<String, String> partitionValuesMap =
+        new HashMap<String, String>() {
+          {
+            put("partition_column1", "partition_value1");
+            put("year_partition_column", "2013");
+            put("month_partition_column", "8");
+            put("day_partition_column", "20");
+          }
+        };
+    scala.collection.mutable.Map<String, String> scalaMap =
+        convertJavaMapToScalaMap(partitionValuesMap);
+    InternalPartitionField internalPartitionField1 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column1")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.VALUE)
+            .build();
+    InternalPartitionField internalPartitionField2 =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("some_date_column")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("timestamp")
+                            .dataType(InternalType.TIMESTAMP)
+                            .build())
+                    .build())
+            .partitionFieldNames(
+                Arrays.asList(
+                    "year_partition_column", "month_partition_column", 
"day_partition_column"))
+            .transformType(PartitionTransformType.DAY)
+            .build();
+    Range rangeForPartitionField1 = Range.scalar("partition_value1");
+    Range rangeForPartitionField2 = Range.scalar(1376956800000L);
+    List<PartitionValue> expectedPartitionValues =
+        Arrays.asList(
+            PartitionValue.builder()
+                .partitionField(internalPartitionField1)
+                .range(rangeForPartitionField1)
+                .build(),
+            PartitionValue.builder()
+                .partitionField(internalPartitionField2)
+                .range(rangeForPartitionField2)
+                .build());
+    List<PartitionValue> partitionValues =
+        deltaKernelPartitionExtractor.partitionValueExtraction(
+            scalaMap, Arrays.asList(internalPartitionField1, 
internalPartitionField2));
+    assertEquals(expectedPartitionValues, partitionValues);
+  }
+
+  @Test
+  void convertBucketPartition() {
+    InternalPartitionField internalPartitionField =
+        InternalPartitionField.builder()
+            .sourceField(
+                InternalField.builder()
+                    .name("partition_column1")
+                    .schema(
+                        InternalSchema.builder()
+                            .name("string")
+                            .dataType(InternalType.STRING)
+                            .build())
+                    .build())
+            .transformType(PartitionTransformType.BUCKET)
+            
.transformOptions(Collections.singletonMap(InternalPartitionField.NUM_BUCKETS, 
5))
+            .build();
+    System.out.println("internalPartitionField" + internalPartitionField);

Review Comment:
   Remove the println in this class as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to