vaibhavk1992 commented on code in PR #729: URL: https://github.com/apache/incubator-xtable/pull/729#discussion_r2559681587
########## xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelPartitionExtractor.java: ########## @@ -0,0 +1,535 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.apache.xtable.collectors.CustomCollectors.toList; +import static org.apache.xtable.delta.DeltaValueConverter.convertFromDeltaPartitionValue; +import static org.apache.xtable.delta.DeltaValueConverter.convertToDeltaPartitionValue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.Builder; +import lombok.NoArgsConstructor; +import lombok.extern.log4j.Log4j2; + +import scala.collection.JavaConverters; + +import com.google.common.collect.Iterators; +import com.google.common.collect.PeekingIterator; + +import io.delta.kernel.types.*; +import io.delta.kernel.types.FieldMetadata; + +import org.apache.xtable.exception.PartitionSpecException; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.schema.SchemaFieldFinder; + +@Log4j2 +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class DeltaKernelPartitionExtractor { + private static final DeltaKernelPartitionExtractor INSTANCE = new DeltaKernelPartitionExtractor(); + private static final String CAST_FUNCTION = "CAST(%s as DATE)"; + private static final String DATE_FORMAT_FUNCTION = "DATE_FORMAT(%s, '%s')"; + private static final String YEAR_FUNCTION = "YEAR(%s)"; + private static final String DATE_FORMAT_FOR_HOUR = "yyyy-MM-dd-HH"; + private static final String DATE_FORMAT_FOR_DAY = "yyyy-MM-dd"; + private static final String DATE_FORMAT_FOR_MONTH = "yyyy-MM"; + private static final String DATE_FORMAT_FOR_YEAR = "yyyy"; + private static final String BUCKET_FUNCTION = "MOD((HASH(%s) & %d), %d)"; + // For timestamp partition fields, actual partition column names in delta format will be of type + // generated & and with a name like `delta_partition_col_{transform_type}_{source_field_name}`. + private static final String DELTA_PARTITION_COL_NAME_FORMAT = "xtable_partition_col_%s_%s"; + static final String DELTA_GENERATION_EXPRESSION = "delta.generationExpression"; + private static final List<ParsedGeneratedExpr.GeneratedExprType> GRANULARITIES = + Arrays.asList( + ParsedGeneratedExpr.GeneratedExprType.YEAR, + ParsedGeneratedExpr.GeneratedExprType.MONTH, + ParsedGeneratedExpr.GeneratedExprType.DAY, + ParsedGeneratedExpr.GeneratedExprType.HOUR); + + public static DeltaKernelPartitionExtractor getInstance() { + return INSTANCE; + } + + /** + * Extracts partition fields from delta table. Partitioning by nested columns isn't supported. + * Example: Given a delta table and a reference to DeltaLog, method parameters can be obtained by + * deltaLog = DeltaLog.forTable(spark, deltaTablePath); InternalSchema internalSchema = + * DeltaSchemaExtractor.getInstance().toInternalSchema(deltaLog.snapshot().schema()); StructType + * partitionSchema = deltaLog.metadata().partitionSchema(); + * + * @param internalSchema canonical representation of the schema. + * @param partitionSchema partition schema of the delta table. + * @return list of canonical representation of the partition fields + */ + public List<InternalPartitionField> convertFromDeltaPartitionFormat( + InternalSchema internalSchema, StructType partitionSchema) { + if (partitionSchema.fields().size() == 0) { + return Collections.emptyList(); + } + return getInternalPartitionFields(partitionSchema, internalSchema); + } + + /** + * If all of them are value process individually and return. If they contain month they should + * contain year as well. If they contain day they should contain month and year as well. If they + * contain hour they should contain day, month and year as well. Other supports CAST(col as DATE) + * and DATE_FORMAT(col, 'yyyy-MM-dd'). Partition by nested fields may not be fully supported. + */ + private List<InternalPartitionField> getInternalPartitionFields( + StructType partitionSchema, InternalSchema internalSchema) { + PeekingIterator<StructField> itr = + Iterators.peekingIterator(partitionSchema.fields().iterator()); + List<InternalPartitionField> partitionFields = new ArrayList<>(partitionSchema.fields().size()); + while (itr.hasNext()) { + StructField currPartitionField = itr.peek(); + if (!currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + partitionFields.add( + InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, currPartitionField.getName())) + .transformType(PartitionTransformType.VALUE) + .build()); + itr.next(); // consume the field. + } else { + // Partition contains generated expression. + // if it starts with year we should consume until we hit field with no generated expression + // or we hit a field with generated expression that is of cast or date format. + String expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + ParsedGeneratedExpr parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else if (ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + partitionFields.add( + getPartitionWithDateFormatTransform( + currPartitionField.getName(), parsedGeneratedExpr, internalSchema)); + itr.next(); // consume the field. + } else { + // consume until we hit field with no generated expression or generated expression + // that is not of type cast or date format. + List<ParsedGeneratedExpr> parsedGeneratedExprs = new ArrayList<>(); + while (itr.hasNext() + && currPartitionField.getMetadata().contains(DELTA_GENERATION_EXPRESSION)) { + expr = currPartitionField.getMetadata().getString(DELTA_GENERATION_EXPRESSION); + parsedGeneratedExpr = + ParsedGeneratedExpr.buildFromString(currPartitionField.getName(), expr); + + if (ParsedGeneratedExpr.GeneratedExprType.CAST == parsedGeneratedExpr.generatedExprType + || ParsedGeneratedExpr.GeneratedExprType.DATE_FORMAT + == parsedGeneratedExpr.generatedExprType) { + break; + } + parsedGeneratedExprs.add(parsedGeneratedExpr); + itr.next(); // consume the field + if (itr.hasNext()) { + currPartitionField = itr.peek(); + } + } + partitionFields.add( + getPartitionColumnsForHourOrDayOrMonthOrYear(parsedGeneratedExprs, internalSchema)); + } + } + } + return partitionFields; + } + + private InternalPartitionField getPartitionColumnsForHourOrDayOrMonthOrYear( + List<ParsedGeneratedExpr> parsedGeneratedExprs, InternalSchema internalSchema) { + if (parsedGeneratedExprs.size() > 4) { + throw new IllegalStateException("Invalid partition transform"); + } + validate( + parsedGeneratedExprs, new HashSet<>(GRANULARITIES.subList(0, parsedGeneratedExprs.size()))); + + ParsedGeneratedExpr transform = parsedGeneratedExprs.get(0); + List<String> partitionColumns = + parsedGeneratedExprs.stream() + .map(parsedGeneratedExpr -> parsedGeneratedExpr.partitionColumnName) + .collect(toList(parsedGeneratedExprs.size())); + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance().findFieldByPath(internalSchema, transform.sourceColumn)) + .partitionFieldNames(partitionColumns) + .transformType( + parsedGeneratedExprs.get(parsedGeneratedExprs.size() - 1) + .internalPartitionTransformType) + .build(); + } + + // Cast has default format of yyyy-MM-dd. + private InternalPartitionField getPartitionWithDateTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(PartitionTransformType.DAY) + .build(); + } + + private InternalPartitionField getPartitionWithDateFormatTransform( + String partitionColumnName, + ParsedGeneratedExpr parsedGeneratedExpr, + InternalSchema internalSchema) { + return InternalPartitionField.builder() + .sourceField( + SchemaFieldFinder.getInstance() + .findFieldByPath(internalSchema, parsedGeneratedExpr.sourceColumn)) + .partitionFieldNames(Collections.singletonList(partitionColumnName)) + .transformType(parsedGeneratedExpr.internalPartitionTransformType) + .build(); + } + + public Map<String, StructField> convertToDeltaPartitionFormat( + List<InternalPartitionField> partitionFields) { + if (partitionFields == null) { + return null; + } + Map<String, StructField> nameToStructFieldMap = new HashMap<>(); + for (InternalPartitionField internalPartitionField : partitionFields) { + String currPartitionColumnName; + StructField field; + + if (internalPartitionField.getTransformType() == PartitionTransformType.VALUE) { + currPartitionColumnName = internalPartitionField.getSourceField().getName(); + field = null; + } else { + // Since partition field of timestamp or bucket type, create new field in schema. + field = getGeneratedField(internalPartitionField); + currPartitionColumnName = field.getName(); + } + nameToStructFieldMap.put(currPartitionColumnName, field); + } + return nameToStructFieldMap; + } + + public Map<String, String> partitionValueSerialization(InternalDataFile internalDataFile) { + Map<String, String> partitionValuesSerialized = new HashMap<>(); + if (internalDataFile.getPartitionValues() == null + || internalDataFile.getPartitionValues().isEmpty()) { + return partitionValuesSerialized; + } + for (PartitionValue partitionValue : internalDataFile.getPartitionValues()) { + InternalPartitionField partitionField = partitionValue.getPartitionField(); + PartitionTransformType transformType = partitionField.getTransformType(); + String partitionValueSerialized; + if (transformType == PartitionTransformType.VALUE) { + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + ""); + partitionValuesSerialized.put( + partitionField.getSourceField().getName(), partitionValueSerialized); + } else if (transformType == PartitionTransformType.BUCKET) { + partitionValueSerialized = partitionValue.getRange().getMaxValue().toString(); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } else { + // use appropriate date formatter for value serialization. + partitionValueSerialized = + convertToDeltaPartitionValue( + partitionValue.getRange().getMaxValue(), + partitionField.getSourceField().getSchema().getDataType(), + transformType, + getDateFormat(partitionField.getTransformType())); + partitionValuesSerialized.put( + getGeneratedColumnName(partitionField), partitionValueSerialized); + } + } + return partitionValuesSerialized; + } + + public List<PartitionValue> partitionValueExtraction( + java.util.Map<String, String> values, List<InternalPartitionField> partitionFields) { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
