This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 0d7cc87d687 [HUDI-7840] Add position merging to the new file group reader (#11413) 0d7cc87d687 is described below commit 0d7cc87d687bd235bac099e481535cb9f223b501 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri Jun 7 19:47:42 2024 -0400 [HUDI-7840] Add position merging to the new file group reader (#11413) Co-authored-by: Jonathan Vexler <=> Co-authored-by: Sagar Sumit <sagarsumi...@gmail.com> --- .../SparkFileFormatInternalRowReaderContext.scala | 202 +++++++++++++++---- .../hudi/common/engine/HoodieReaderContext.java | 22 +++ .../common/table/read/HoodieFileGroupReader.java | 6 +- .../HoodiePositionBasedFileGroupRecordBuffer.java | 4 +- .../read/HoodiePositionBasedSchemaHandler.java | 75 ++++++++ ...odieFileGroupReaderBasedParquetFileFormat.scala | 2 +- ...stSparkFileFormatInternalRowReaderContext.scala | 72 +++++++ ...stHoodiePositionBasedFileGroupRecordBuffer.java | 214 +++++++++++++++++++++ .../functional/TestFiltersInFileGroupReader.java | 109 +++++++++++ .../read/TestHoodieFileGroupReaderOnSpark.scala | 2 +- .../TestSpark35RecordPositionMetadataColumn.scala | 143 ++++++++++++++ 11 files changed, 812 insertions(+), 39 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala index 640f1219fbf..715e2d9a9ab 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala @@ -22,10 +22,14 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord import org.apache.hadoop.conf.Configuration +import org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap, getAppliedRequiredSchema} +import org.apache.hudi.avro.AvroSchemaUtils import org.apache.hudi.common.engine.HoodieReaderContext import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME import org.apache.hudi.common.util.ValidationUtils.checkState -import org.apache.hudi.common.util.collection.{ClosableIterator, CloseableMappingIterator} +import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, CloseableMappingIterator} import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader} import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.hudi.util.CloseableInternalRowIterator @@ -37,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkParquetReader} import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} import scala.collection.mutable @@ -53,12 +57,20 @@ import scala.collection.mutable * not required for reading a file group with only log files. * @param recordKeyColumn column name for the recordkey * @param filters spark filters that might be pushed down into the reader + * @param requiredFilters filters that are required and should always be used, even in merging situations */ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetReader, recordKeyColumn: String, - filters: Seq[Filter]) extends BaseSparkInternalRowReaderContext { + filters: Seq[Filter], + requiredFilters: Seq[Filter]) extends BaseSparkInternalRowReaderContext { lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter + private lazy val bootstrapSafeFilters: Seq[Filter] = filters.filter(filterIsSafeForBootstrap) ++ requiredFilters private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = mutable.Map() + private lazy val allFilters = filters ++ requiredFilters + + override def supportsParquetRowIndex: Boolean = { + HoodieSparkUtils.gteqSpark3_5 + } override def getFileRecordIterator(filePath: StoragePath, start: Long, @@ -66,6 +78,10 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea dataSchema: Schema, requiredSchema: Schema, storage: HoodieStorage): ClosableIterator[InternalRow] = { + val hasRowIndexField = AvroSchemaUtils.containsFieldInSchema(requiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME) + if (hasRowIndexField) { + assert(supportsParquetRowIndex()) + } val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema) if (FSUtils.isLogFile(filePath)) { val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType) @@ -84,8 +100,20 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea // each row if they are given. That is the only usage of the partition values in the reader. val fileInfo = sparkAdapter.getSparkPartitionedFileUtils .createPartitionedFile(InternalRow.empty, filePath, start, length) + val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, hasRowIndexField) new CloseableInternalRowIterator(parquetFileReader.read(fileInfo, - structType, StructType(Seq.empty), Seq.empty, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]])) + readSchema, StructType(Seq.empty), readFilters, storage.getConf.asInstanceOf[StorageConfiguration[Configuration]])) + } + } + + private def getSchemaAndFiltersForRead(structType: StructType, hasRowIndexField: Boolean): (StructType, Seq[Filter]) = { + val schemaForRead = getAppliedRequiredSchema(structType, hasRowIndexField) + if (!getHasLogFiles && !getNeedsBootstrapMerge) { + (schemaForRead, allFilters) + } else if (!getHasLogFiles && hasRowIndexField) { + (schemaForRead, bootstrapSafeFilters) + } else { + (schemaForRead, requiredFilters) } } @@ -116,45 +144,153 @@ class SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea skeletonRequiredSchema: Schema, dataFileIterator: ClosableIterator[InternalRow], dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { - doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], - dataFileIterator.asInstanceOf[ClosableIterator[Any]]) + doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]], skeletonRequiredSchema, + dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema) } - protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = { - new ClosableIterator[Any] { - val combinedRow = new JoinedRow() + private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any], + skeletonRequiredSchema: Schema, + dataFileIterator: ClosableIterator[Any], + dataRequiredSchema: Schema): ClosableIterator[InternalRow] = { + if (supportsParquetRowIndex()) { + assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME)) + val rowIndexColumn = new java.util.HashSet[String]() + rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME) + //always remove the row index column from the skeleton because the data file will also have the same column + val skeletonProjection = projectRecord(skeletonRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema, rowIndexColumn)) - override def hasNext: Boolean = { - //If the iterators are out of sync it is probably due to filter pushdown - checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, - "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") - dataFileIterator.hasNext && skeletonFileIterator.hasNext + //If we need to do position based merging with log files we will leave the row index column at the end + val dataProjection = if (getHasLogFiles && getShouldMergeUseRecordPosition) { + getIdentityProjection + } else { + projectRecord(dataRequiredSchema, + AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema, rowIndexColumn)) } - override def next(): Any = { - (skeletonFileIterator.next(), dataFileIterator.next()) match { - case (s: ColumnarBatch, d: ColumnarBatch) => - val numCols = s.numCols() + d.numCols() - val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols) - for (i <- 0 until numCols) { - if (i < s.numCols()) { - vecs(i) = s.column(i) + //row index will always be the last column + val skeletonRowIndex = skeletonRequiredSchema.getFields.size() - 1 + val dataRowIndex = dataRequiredSchema.getFields.size() - 1 + + //Always use internal row for positional merge because + //we need to iterate row by row when merging + new CachingIterator[InternalRow] { + val combinedRow = new JoinedRow() + + private def getNextSkeleton: (InternalRow, Long) = { + val nextSkeletonRow = skeletonFileIterator.next().asInstanceOf[InternalRow] + (nextSkeletonRow, nextSkeletonRow.getLong(skeletonRowIndex)) + } + + private def getNextData: (InternalRow, Long) = { + val nextDataRow = dataFileIterator.next().asInstanceOf[InternalRow] + (nextDataRow, nextDataRow.getLong(dataRowIndex)) + } + + override def close(): Unit = { + skeletonFileIterator.close() + dataFileIterator.close() + } + + override protected def doHasNext(): Boolean = { + if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) { + false + } else { + var nextSkeleton = getNextSkeleton + var nextData = getNextData + while (nextSkeleton._2 != nextData._2) { + if (nextSkeleton._2 > nextData._2) { + if (!dataFileIterator.hasNext) { + return false + } else { + nextData = getNextData + } } else { - vecs(i) = d.column(i - s.numCols()) + if (!skeletonFileIterator.hasNext) { + return false + } else { + nextSkeleton = getNextSkeleton + } } } - assert(s.numRows() == d.numRows()) - sparkAdapter.makeColumnarBatch(vecs, s.numRows()) - case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch") - case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch") - case (s: InternalRow, d: InternalRow) => combinedRow(s, d) + nextRecord = combinedRow(skeletonProjection.apply(nextSkeleton._1), dataProjection.apply(nextData._1)) + true + } } } + } else { + new ClosableIterator[Any] { + val combinedRow = new JoinedRow() - override def close(): Unit = { - skeletonFileIterator.close() - dataFileIterator.close() - } - }.asInstanceOf[ClosableIterator[InternalRow]] + override def hasNext: Boolean = { + //If the iterators are out of sync it is probably due to filter pushdown + checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, + "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") + dataFileIterator.hasNext && skeletonFileIterator.hasNext + } + + override def next(): Any = { + (skeletonFileIterator.next(), dataFileIterator.next()) match { + case (s: ColumnarBatch, d: ColumnarBatch) => + //This will not be used until [HUDI-7693] is implemented + val numCols = s.numCols() + d.numCols() + val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols) + for (i <- 0 until numCols) { + if (i < s.numCols()) { + vecs(i) = s.column(i) + } else { + vecs(i) = d.column(i - s.numCols()) + } + } + assert(s.numRows() == d.numRows()) + sparkAdapter.makeColumnarBatch(vecs, s.numRows()) + case (_: ColumnarBatch, _: InternalRow) => throw new IllegalStateException("InternalRow ColumnVector mismatch") + case (_: InternalRow, _: ColumnarBatch) => throw new IllegalStateException("InternalRow ColumnVector mismatch") + case (s: InternalRow, d: InternalRow) => combinedRow(s, d) + } + } + + override def close(): Unit = { + skeletonFileIterator.close() + dataFileIterator.close() + } + }.asInstanceOf[ClosableIterator[InternalRow]] + } } } + +object SparkFileFormatInternalRowReaderContext { + // From "namedExpressions.scala": Used to construct to record position field metadata. + private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = "__file_source_generated_metadata_col" + private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col" + private val METADATA_COL_ATTR_KEY = "__metadata_col" + + def getAppliedRequiredSchema(requiredSchema: StructType, shouldAddRecordPosition: Boolean): StructType = { + if (shouldAddRecordPosition) { + val metadata = new MetadataBuilder() + .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME) + .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true) + .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME) + .build() + val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = false, metadata) + StructType(requiredSchema.fields.filterNot(isIndexTempColumn) :+ rowIndexField) + } else { + requiredSchema + } + } + + /** + * Only valid if there is support for RowIndexField and no log files + * Filters are safe for bootstrap if meta col filters are independent from data col filters. + */ + def filterIsSafeForBootstrap(filter: Filter): Boolean = { + val metaRefCount = filter.references.count(c => HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase)) + metaRefCount == filter.references.length || metaRefCount == 0 + } + + private def isIndexTempColumn(field: StructField): Boolean = { + field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME) + } + +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java index 81db7ef9e70..218e0eb4b03 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java @@ -58,6 +58,7 @@ public abstract class HoodieReaderContext<T> { private Boolean hasLogFiles = null; private Boolean hasBootstrapBaseFile = null; private Boolean needsBootstrapMerge = null; + private Boolean shouldMergeUseRecordPosition = null; // Getter and Setter for schemaHandler public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() { @@ -122,6 +123,15 @@ public abstract class HoodieReaderContext<T> { this.needsBootstrapMerge = needsBootstrapMerge; } + // Getter and Setter for useRecordPosition + public boolean getShouldMergeUseRecordPosition() { + return shouldMergeUseRecordPosition; + } + + public void setShouldMergeUseRecordPosition(boolean shouldMergeUseRecordPosition) { + this.shouldMergeUseRecordPosition = shouldMergeUseRecordPosition; + } + // These internal key names are only used in memory for record metadata and merging, // and should not be persisted to storage. public static final String INTERNAL_META_RECORD_KEY = "_0"; @@ -301,9 +311,21 @@ public abstract class HoodieReaderContext<T> { * @return the record position in the base file. */ public long extractRecordPosition(T record, Schema schema, String fieldName, long providedPositionIfNeeded) { + if (supportsParquetRowIndex()) { + Object position = getValue(record, schema, fieldName); + if (position != null) { + return (long) position; + } else { + throw new IllegalStateException("Record position extraction failed"); + } + } return providedPositionIfNeeded; } + public boolean supportsParquetRowIndex() { + return false; + } + /** * Constructs engine specific delete record. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index fe450bb0165..396da4166a7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -106,10 +106,12 @@ public final class HoodieFileGroupReader<T> implements Closeable { readerContext.setRecordMerger(this.recordMerger); readerContext.setTablePath(tablePath); readerContext.setLatestCommitTime(latestCommitTime); + readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition); readerContext.setHasLogFiles(!this.logFiles.isEmpty()); readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() && hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent()); - readerContext.setSchemaHandler(new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, - requestedSchema, internalSchemaOpt, tableConfig)); + readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex() + ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig) + : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema, requestedSchema, internalSchemaOpt, tableConfig)); this.outputConverter = readerContext.getSchemaHandler().getOutputConverter(); this.recordBuffer = this.logFiles.isEmpty() ? null diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java index 29f05b015ed..bc25bb96f5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java @@ -53,7 +53,7 @@ import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STR * {@link #hasNext} method is called. */ public class HoodiePositionBasedFileGroupRecordBuffer<T> extends HoodieBaseFileGroupRecordBuffer<T> { - public static final String ROW_INDEX_COLUMN_NAME = "row_index"; + private static final String ROW_INDEX_COLUMN_NAME = "row_index"; public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME = "_tmp_metadata_" + ROW_INDEX_COLUMN_NAME; private long nextRecordPosition = 0L; @@ -180,7 +180,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> extends HoodieBaseFileG // Handle merging. while (baseFileIterator.hasNext()) { T baseRecord = baseFileIterator.next(); - nextRecordPosition = readerContext.extractRecordPosition(baseRecord, readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition); + nextRecordPosition = readerContext.extractRecordPosition(baseRecord, readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition); Pair<Option<T>, Map<String, Object>> logRecordInfo = records.remove(nextRecordPosition++); if (hasNextBaseRecord(baseRecord, logRecordInfo)) { return true; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java new file mode 100644 index 00000000000..87c4266e350 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read; + +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.internal.schema.InternalSchema; + +import org.apache.avro.Schema; + +import java.util.Collections; +import java.util.List; + +import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested; + +/** + * This class is responsible for handling the schema for the file group reader that supports positional merge. + */ +public class HoodiePositionBasedSchemaHandler<T> extends HoodieFileGroupReaderSchemaHandler<T> { + public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext, + Schema dataSchema, + Schema requestedSchema, + Option<InternalSchema> internalSchemaOpt, + HoodieTableConfig hoodieTableConfig) { + super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, hoodieTableConfig); + } + + @Override + protected Schema prepareRequiredSchema() { + Schema preMergeSchema = super.prepareRequiredSchema(); + return readerContext.getShouldMergeUseRecordPosition() && readerContext.getHasLogFiles() + ? addPositionalMergeCol(preMergeSchema) + : preMergeSchema; + } + + @Override + public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapRequiredFields() { + Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols = super.getBootstrapRequiredFields(); + if (readerContext.supportsParquetRowIndex()) { + if (!dataAndMetaCols.getLeft().isEmpty() && !dataAndMetaCols.getRight().isEmpty()) { + dataAndMetaCols.getLeft().add(getPositionalMergeField()); + dataAndMetaCols.getRight().add(getPositionalMergeField()); + } + } + return dataAndMetaCols; + } + + private static Schema addPositionalMergeCol(Schema input) { + return appendFieldsToSchemaDedupNested(input, Collections.singletonList(getPositionalMergeField())); + } + + private static Schema.Field getPositionalMergeField() { + return new Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME, + Schema.create(Schema.Type.LONG), "", -1L); + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 0412d202148..c9e001dde59 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -149,7 +149,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, val hoodieBaseFile = fileSlice.getBaseFile.get() baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues, hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen)) } else { - val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, tableState.recordKeyField, filters) + val readerContext = new SparkFileFormatInternalRowReaderContext(parquetFileReader.value, tableState.recordKeyField, filters, requiredFilters) val storageConf = broadcastedStorageConf.value val metaClient: HoodieTableMetaClient = HoodieTableMetaClient .builder().setConf(storageConf).setBasePath(tableState.tablePath).build diff --git a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala new file mode 100644 index 00000000000..abf9d238dd6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.execution.datasources.parquet + +import org.apache.hudi.SparkFileFormatInternalRowReaderContext +import org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.spark.sql.sources.{And, IsNotNull, Or} +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Test + +class TestSparkFileFormatInternalRowReaderContext extends SparkClientFunctionalTestHarness { + + @Test + def testBootstrapFilters(): Unit = { + val recordKeyField = HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName + val commitTimeField = HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName + + val recordKeyFilter = IsNotNull(recordKeyField) + assertTrue(filterIsSafeForBootstrap(recordKeyFilter)) + val commitTimeFilter = IsNotNull(commitTimeField) + assertTrue(filterIsSafeForBootstrap(commitTimeFilter)) + + val dataFieldFilter = IsNotNull("someotherfield") + assertTrue(filterIsSafeForBootstrap(dataFieldFilter)) + + val legalComplexFilter = Or(recordKeyFilter, commitTimeFilter) + assertTrue(filterIsSafeForBootstrap(legalComplexFilter)) + + val illegalComplexFilter = Or(recordKeyFilter, dataFieldFilter) + assertFalse(filterIsSafeForBootstrap(illegalComplexFilter)) + + val illegalNestedFilter = And(legalComplexFilter, illegalComplexFilter) + assertFalse(filterIsSafeForBootstrap(illegalNestedFilter)) + + val legalNestedFilter = And(legalComplexFilter, recordKeyFilter) + assertTrue(filterIsSafeForBootstrap(legalNestedFilter)) + } + + @Test + def testGetAppliedRequiredSchema(): Unit = { + val fields = Array( + StructField("column_a", LongType, nullable = false), + StructField("column_b", StringType, nullable = false)) + val requiredSchema = StructType(fields) + + val appliedSchema: StructType = SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema( + requiredSchema, true) + assertEquals(3, appliedSchema.fields.length) + assertTrue(appliedSchema.fields.map(f => f.name).contains(ROW_INDEX_TEMPORARY_COLUMN_NAME)) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java new file mode 100644 index 00000000000..e59e65bea3e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordMerger; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer; +import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler; +import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY; +import static org.apache.hudi.common.model.WriteOperationType.INSERT; +import static org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient; +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHoodiePositionBasedFileGroupRecordBuffer extends TestHoodieFileGroupReaderOnSpark { + private final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(0xDEEF); + private HoodieTableMetaClient metaClient; + private Schema avroSchema; + private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer; + private String partitionPath; + + public void prepareBuffer(boolean useCustomMerger) throws Exception { + Map<String, String> writeConfigs = new HashMap<>(); + writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), "parquet"); + writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key"); + writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition_path"); + writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp"); + writeConfigs.put("hoodie.payload.ordering.field", "timestamp"); + writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test"); + writeConfigs.put("hoodie.insert.shuffle.parallelism", "4"); + writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4"); + writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2"); + writeConfigs.put("hoodie.delete.shuffle.parallelism", "1"); + writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0"); + writeConfigs.put("hoodie.compact.inline", "false"); + writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true"); + commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)), INSERT.value(), writeConfigs); + + String[] partitionPaths = dataGen.getPartitionPaths(); + String[] partitionValues = new String[1]; + partitionPath = partitionPaths[0]; + partitionValues[0] = partitionPath; + + metaClient = createMetaClient(getStorageConf(), getBasePath()); + avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + Option<String[]> partitionFields = metaClient.getTableConfig().getPartitionFields(); + Option<String> partitionNameOpt = StringUtils.isNullOrEmpty(partitionPaths[0]) + ? Option.empty() : Option.of(partitionPaths[0]); + + HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf()); + ctx.setTablePath(metaClient.getBasePathV2().toString()); + ctx.setLatestCommitTime(metaClient.createNewInstantTime()); + ctx.setShouldMergeUseRecordPosition(true); + ctx.setHasBootstrapBaseFile(false); + ctx.setHasLogFiles(true); + ctx.setNeedsBootstrapMerge(false); + ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new HoodieSparkRecordMerger()); + ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx, avroSchema, avroSchema, + Option.empty(), metaClient.getTableConfig())); + buffer = new HoodiePositionBasedFileGroupRecordBuffer<>( + ctx, + metaClient, + partitionNameOpt, + partitionFields, + ctx.getRecordMerger(), + new TypedProperties(), + 1024 * 1024 * 1000, + metaClient.getTempFolderPath(), + ExternalSpillableMap.DiskMapType.ROCKS_DB, + false); + } + + public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() { + Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, avroSchema.toString()); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + return header; + } + + public List<DeleteRecord> getDeleteRecords() throws IOException, URISyntaxException { + SchemaTestUtil testUtil = new SchemaTestUtil(); + List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 100); + + List<DeleteRecord> deletedRecords = records.stream() + .map(s -> (DeleteRecord.create(((GenericRecord) s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(), + ((GenericRecord) s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()))) + .collect(Collectors.toList()).subList(0, 50); + return deletedRecords; + } + + public HoodieDeleteBlock getDeleteBlockWithPositions() throws IOException, URISyntaxException { + List<DeleteRecord> deletedRecords = getDeleteRecords(); + List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>(); + + long position = 0; + for (DeleteRecord dr : deletedRecords) { + deleteRecordList.add(Pair.of(dr, position++)); + } + return new HoodieDeleteBlock(deleteRecordList, true, getHeader()); + } + + public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws IOException, URISyntaxException { + List<DeleteRecord> deletedRecords = getDeleteRecords(); + List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>(); + + for (DeleteRecord dr : deletedRecords) { + deleteRecordList.add(Pair.of(dr, -1L)); + } + return new HoodieDeleteBlock(deleteRecordList, true, getHeader()); + } + + @Test + public void testProcessDeleteBlockWithPositions() throws Exception { + prepareBuffer(false); + HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions(); + buffer.processDeleteBlock(deleteBlock); + assertEquals(50, buffer.getLogRecords().size()); + // With record positions, we do not need the record keys. + assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY)); + } + + @Test + public void testProcessDeleteBlockWithCustomMerger() throws Exception { + prepareBuffer(true); + HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions(); + buffer.processDeleteBlock(deleteBlock); + assertEquals(50, buffer.getLogRecords().size()); + assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY)); + } + + @Test + public void testProcessDeleteBlockWithoutPositions() throws Exception { + prepareBuffer(false); + HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions(); + Exception exception = assertThrows( + HoodieValidationException.class, () -> buffer.processDeleteBlock(deleteBlock)); + assertTrue(exception.getMessage().contains("No record position info is found")); + } + + public static class CustomMerger implements HoodieRecordMerger { + @Override + public String getMergingStrategy() { + return "random_strategy"; + } + + @Override + public Option<Pair<HoodieRecord, Schema>> merge( + HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props + ) throws IOException { + throw new IOException("Not implemented"); + } + + @Override + public HoodieRecord.HoodieRecordType getRecordType() { + return HoodieRecord.HoodieRecordType.SPARK; + } + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java new file mode 100644 index 00000000000..b8ca6373237 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.common.config.HoodieReaderConfig; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Map; + + +/** + * Ensure that parquet filters are not being pushed down when they shouldn't be + */ +@Tag("functional") +public class TestFiltersInFileGroupReader extends TestBootstrapReadBase { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testFiltersInFileFormat(boolean mergeUseRecordPositions) { + this.bootstrapType = "mixed"; + this.dashPartitions = true; + this.tableType = HoodieTableType.MERGE_ON_READ; + this.nPartitions = 2; + this.nInserts = 100000; + this.nUpdates = 20000; + sparkSession.conf().set(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(), "true"); + setupDirs(); + + //do bootstrap + Map<String, String> options = setBootstrapOptions(); + Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame(); + bootstrapDf.write().format("hudi") + .options(options) + .mode(SaveMode.Overwrite) + .save(bootstrapTargetPath); + runComparison(mergeUseRecordPositions); + + + options = basicOptions(); + options.put(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), String.valueOf(mergeUseRecordPositions)); + options.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), String.valueOf(mergeUseRecordPositions)); + + doUpdate(options, "001"); + runComparison(mergeUseRecordPositions); + + doInsert(options, "002"); + runComparison(mergeUseRecordPositions); + + doDelete(options, "003"); + runComparison(mergeUseRecordPositions); + } + + protected void runComparison(boolean mergeUseRecordPositions) { + compareDf(createDf(hudiBasePath, true, mergeUseRecordPositions), createDf(hudiBasePath, false, false)); + compareDf(createDf(bootstrapTargetPath, true, mergeUseRecordPositions), createDf(bootstrapTargetPath, false, false)); + compareDf(createDf2(hudiBasePath, true, mergeUseRecordPositions), createDf2(hudiBasePath, false, false)); + compareDf(createDf2(bootstrapTargetPath, true, mergeUseRecordPositions), createDf2(bootstrapTargetPath, false, false)); + } + + protected Dataset<Row> createDf(String tableBasePath, Boolean fgReaderEnabled, Boolean mergeUseRecordPositions) { + //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 90% + //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is in the data + //We have a record key filter so that tests MORs filter pushdown with position based merging + return sparkSession.read().format("hudi") + .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), fgReaderEnabled) + .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), mergeUseRecordPositions) + .load(tableBasePath) + .drop("city_to_state") + .where("begin_lat > 0.5 and _hoodie_record_key LIKE '%00%'"); + } + + protected Dataset<Row> createDf2(String tableBasePath, Boolean fgReaderEnabled, Boolean mergeUseRecordPositions) { + //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around 90% + //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is in the data + //We have a record key filter so that tests MORs filter pushdown with position based merging + return sparkSession.read().format("hudi") + .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), fgReaderEnabled) + .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(), mergeUseRecordPositions) + .load(tableBasePath) + .drop("city_to_state") + .where("begin_lat > 0.5 or _hoodie_record_key LIKE '%00%'"); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index 49277884eb1..747fcb9a2eb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -84,7 +84,7 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int val reader = sparkAdapter.createParquetFileReader(vectorized = false, spark.sessionState.conf, Map.empty, storageConf.unwrapAs(classOf[Configuration])) val metaClient = HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build val recordKeyField = new HoodieSparkRecordMerger().getMandatoryFieldsForMerging(metaClient.getTableConfig)(0) - new SparkFileFormatInternalRowReaderContext(reader, recordKeyField, Seq.empty) + new SparkFileFormatInternalRowReaderContext(reader, recordKeyField, Seq.empty, Seq.empty) } override def commitToTable(recordList: util.List[String], operation: String, options: util.Map[String, String]): Unit = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala new file mode 100644 index 00000000000..61b7b0ded04 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table.read + +import org.apache.hadoop.conf.Configuration +import org.apache.hudi.SparkAdapterSupport.sparkAdapter +import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} +import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.testutils.HoodieTestTable +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.util.CloseableInternalRowIterator +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils, SparkFileFormatInternalRowReaderContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse} +import org.junit.jupiter.api.{BeforeEach, Test} + +class TestSpark35RecordPositionMetadataColumn extends SparkClientFunctionalTestHarness { + private val PARQUET_FORMAT = "parquet" + private val SPARK_MERGER = "org.apache.hudi.HoodieSparkRecordMerger" + + @BeforeEach + def setUp(): Unit = { + val _spark = spark + import _spark.implicits._ + + val userToCountryDF = Seq( + (1, "US", "1001"), + (2, "China", "1003"), + (3, "US", "1002"), + (4, "Singapore", "1004")) + .toDF("userid", "country", "ts") + + // Create the file with record positions. + userToCountryDF.write.format("hudi") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country") + .option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, SPARK_MERGER) + .option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key, "true") + .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key, PARQUET_FORMAT) + .option( + DataSourceWriteOptions.TABLE_TYPE.key(), + HoodieTableType.MERGE_ON_READ.name()) + .save(basePath) + } + + @Test + def testRecordPositionColumn(): Unit = { + val _spark = spark + // Prepare the schema + val dataSchema = new StructType( + Array( + StructField("userid", IntegerType, nullable = false), + StructField("country", StringType, nullable = false), + StructField("ts", StringType, nullable = false) + ) + ) + + // Prepare the file and Parquet file reader. + _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") + + val hadoopConf = new Configuration(spark().sparkContext.hadoopConfiguration) + val props = Map("spark.sql.parquet.enableVectorizedReader" -> "false") + _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false") + val reader = sparkAdapter.createParquetFileReader(vectorized = false, _spark.sessionState.conf, props, hadoopConf) + + val metaClient = getHoodieMetaClient(HadoopFSUtils.getStorageConfWithCopy(_spark.sparkContext.hadoopConfiguration), basePath) + val allBaseFiles = HoodieTestTable.of(metaClient).listAllBaseFiles + assertFalse(allBaseFiles.isEmpty) + + val requiredSchema = SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema, + new SparkFileFormatInternalRowReaderContext(reader, "userid", Seq.empty, Seq.empty).supportsParquetRowIndex) + + // Confirm if the schema is as expected. + if (HoodieSparkUtils.gteqSpark3_5) { + assertEquals(4, requiredSchema.fields.length) + assertEquals( + "StructField(_tmp_metadata_row_index,LongType,false)", + requiredSchema.fields(3).toString) + } + + // Make sure we can read all the positions out from base file. + // Here we don't add filters since enabling filter push-down + // for parquet file is tricky. + if (HoodieSparkUtils.gteqSpark3_5) { + val fileInfo = sparkAdapter.getSparkPartitionedFileUtils + .createPartitionedFile( + InternalRow.empty, + allBaseFiles.get(0).getPath, + 0, + allBaseFiles.get(0).getLength) + val iterator = new CloseableInternalRowIterator(reader.read(fileInfo, requiredSchema, + StructType(Seq.empty), Seq.empty, new HadoopStorageConfiguration(hadoopConf))) + var rowIndices: Set[Long] = Set() + while (iterator.hasNext) { + val row = iterator.next() + rowIndices += row.getLong(3) + } + iterator.close() + val expectedRowIndices: Set[Long] = Set(0L, 1L, 2L, 3L) + assertEquals(expectedRowIndices, rowIndices) + } + } + + @Test + def testUseFileGroupReaderDirectly(): Unit = { + val _spark = spark + import _spark.implicits._ + + // Read the records out with record positions. + val allRecords = _spark.read.format("hudi") + .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, "true") + .load(basePath) + + // Ensure the number of outcomes are correct for all Spark versions + // including Spark3.5. + val usRecords = allRecords + .select("userid") + .filter("country = 'US'").map(_.getInt(0)).collect() + assertArrayEquals(Array[Int](1, 3), usRecords) + } +}