This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 89631a812eb [HUDI-8896] FileGroupReader Bootstrap support (#13223)
89631a812eb is described below
commit 89631a812ebc80dd5603bc525e069479f4568a1d
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 6 21:54:44 2025 -0500
[HUDI-8896] FileGroupReader Bootstrap support (#13223)
---
.../apache/hudi/client/model/BootstrapRowData.java | 150 ++++++++++++++++++
.../read/HoodieFileGroupReaderOnJavaTestBase.java | 2 +-
.../hadoop/TestHoodieFileGroupReaderOnHive.java | 15 +-
.../hudi/common/model/HoodieSparkRecord.java | 3 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 22 ++-
.../SparkFileFormatInternalRowReaderContext.scala | 20 ++-
.../apache/spark/sql/HoodieInternalRowUtils.scala | 68 +++++---
.../apache/hudi/avro/HoodieAvroReaderContext.java | 18 ++-
.../hudi/common/engine/HoodieReaderContext.java | 12 +-
.../hudi/common/table/PartitionPathParser.java | 176 +++++++++++++++++++++
.../common/table/read/HoodieFileGroupReader.java | 29 +++-
.../hudi/avro/TestHoodieAvroReaderContext.java | 8 +-
.../hudi/common/table/TestPartitionPathParser.java | 94 +++++++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 114 ++++++++++---
.../common/testutils/HoodieTestDataGenerator.java | 24 ++-
.../resources/file-group-reader/bootstrap_data.zip | Bin 0 -> 157171 bytes
.../table/format/FlinkRowDataReaderContext.java | 18 ++-
.../hudi/hadoop/HiveHoodieReaderContext.java | 12 +-
.../hudi/hadoop/utils/ObjectInspectorCache.java | 2 +-
.../hudi/common/TestHoodieInternalRowUtils.scala | 32 +++-
pom.xml | 1 +
21 files changed, 737 insertions(+), 83 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
new file mode 100644
index 00000000000..513b8b1994b
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/BootstrapRowData.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.model;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * RowData implementation used when reading from Spark bootstrapped table. In
these tables, the partition values
+ * are not always written to the data files, so we need to use the values
inferred from the file's partition path.
+ */
+public class BootstrapRowData implements RowData {
+ private final RowData row;
+ private final Map<Integer, Object> partitionOrdinalToValues;
+
+ public BootstrapRowData(RowData row, Map<Integer, Object>
partitionOrdinalToValues) {
+ this.row = row;
+ this.partitionOrdinalToValues = partitionOrdinalToValues;
+ }
+
+ @Override
+ public int getArity() {
+ return row.getArity();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return row.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ row.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return !partitionOrdinalToValues.containsKey(pos) || row.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return getValue(pos, row::getBoolean);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return getValue(pos, row::getByte);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return getValue(pos, row::getShort);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return getValue(pos, row::getInt);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return getValue(pos, row::getLong);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return getValue(pos, row::getFloat);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return getValue(pos, row::getDouble);
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ return getValue(pos, row::getString);
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ return getValue(pos, (p) -> row.getDecimal(p, precision, scale));
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ return getValue(pos, (p) -> row.getTimestamp(p, precision));
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ return getValue(pos, row::getRawValue);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return getValue(pos, row::getBinary);
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ // bootstrap partition values cannot be arrays
+ return row.getArray(pos);
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ // bootstrap partition values cannot be maps
+ return row.getMap(pos);
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ // bootstrap partition values cannot be rows
+ return row.getRow(pos, numFields);
+ }
+
+ private <T> T getValue(int pos, Function<Integer, T> getter) {
+ if (row.isNullAt(pos) && partitionOrdinalToValues.containsKey(pos)) {
+ return (T) partitionOrdinalToValues.get(pos);
+ }
+ return getter.apply(pos);
+ }
+}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
index 2eedb63b4a9..1b6c7b83fb4 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderOnJavaTestBase.java
@@ -48,7 +48,7 @@ public abstract class HoodieFileGroupReaderOnJavaTestBase<T>
extends TestHoodieF
@Override
public String getBasePath() {
- return tempDir.toAbsolutePath() + "/myTable";
+ return "file://" + tempDir.toAbsolutePath() + "/myTable";
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index ffb194696c0..c01ed5a7a33 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -19,7 +19,6 @@
package org.apache.hudi.hadoop;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -39,6 +38,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapred.JobConf;
import org.junit.jupiter.api.AfterAll;
@@ -46,6 +47,7 @@ import org.junit.jupiter.api.BeforeAll;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
@@ -92,7 +94,7 @@ public class TestHoodieFileGroupReaderOnHive extends
HoodieFileGroupReaderOnJava
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String
tablePath, Schema avroSchema, StorageConfiguration<?> storageConf,
HoodieTableMetaClient metaClient) {
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator =
(inputSplit, jobConf) -> new
MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
- setupJobconf(jobConf, metaClient.getTableConfig().populateMetaFields());
+ setupJobconf(jobConf, avroSchema);
return new HiveHoodieReaderContext(readerCreator,
getStoredPartitionFieldNames(new
JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
new ObjectInspectorCache(avroSchema, jobConf), storageConf,
metaClient.getTableConfig());
@@ -103,12 +105,13 @@ public class TestHoodieFileGroupReaderOnHive extends
HoodieFileGroupReaderOnJava
ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual,
false);
}
- private void setupJobconf(JobConf jobConf, boolean populateMetaFields) {
- Schema schema = populateMetaFields ?
HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA) :
HoodieTestDataGenerator.AVRO_SCHEMA;
+ private void setupJobconf(JobConf jobConf, Schema schema) {
List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, USE_FAKE_PARTITION);
- String metaFieldTypes = "string,string,string,string,string,";
- jobConf.set("columns.types", (populateMetaFields ? metaFieldTypes : "") +
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES + ",string");
+ List<TypeInfo> types =
TypeInfoUtils.getTypeInfosFromTypeString(HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
+ Map<String, String> typeMappings =
HoodieTestDataGenerator.AVRO_SCHEMA.getFields().stream().collect(Collectors.toMap(Schema.Field::name,
field -> types.get(field.pos()).getTypeName()));
+ String columnTypes = fields.stream().map(field ->
typeMappings.getOrDefault(field.name(),
"string")).collect(Collectors.joining(","));
+ jobConf.set("columns.types", columnTypes + ",string");
}
private void setHiveColumnNameProps(List<Schema.Field> fields, JobConf
jobConf, boolean isPartitioned) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index d94888fd83b..c810e7628cd 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -49,6 +49,7 @@ import org.apache.spark.unsafe.types.UTF8String;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Properties;
@@ -217,7 +218,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
- HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
newStructType, renameCols);
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
newStructType, renameCols, Collections.emptyMap());
UnsafeRow unsafeRow = unsafeRowWriter.apply(this.data);
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 9ed37e1e04a..dc2ad7f0b15 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.avro.Schema;
@@ -43,8 +44,11 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
import scala.Function1;
@@ -132,13 +136,23 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
@Override
public UnaryOperator<InternalRow> projectRecord(Schema from, Schema to,
Map<String, String> renamedColumns) {
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
- HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from),
getCachedSchema(to), renamedColumns);
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from),
getCachedSchema(to), renamedColumns, Collections.emptyMap());
return row -> (InternalRow) unsafeRowWriter.apply(row);
-
}
- protected UnaryOperator<InternalRow> getIdentityProjection() {
- return row -> row;
+ /**
+ * Constructs a transformation that will take a row and convert it to a new
row with the given schema and adds in the values for the partition columns if
they are missing in the returned row.
+ * It is assumed that the `to` schema will contain the partition fields.
+ * @param from the original schema
+ * @param to the schema the row will be converted to
+ * @param partitionFieldAndValues the partition fields and their values, if
any are required by the reader
+ * @return a function for transforming the row
+ */
+ protected UnaryOperator<InternalRow> getBootstrapProjection(Schema from,
Schema to, List<Pair<String, Object>> partitionFieldAndValues) {
+ Map<Integer, Object> partitionValuesByIndex =
partitionFieldAndValues.stream().collect(Collectors.toMap(pair ->
to.getField(pair.getKey()).pos(), Pair::getRight));
+ Function1<InternalRow, UnsafeRow> unsafeRowWriter =
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from),
getCachedSchema(to), Collections.emptyMap(), partitionValuesByIndex);
+ return row -> (InternalRow) unsafeRowWriter.apply(row);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 1c595550405..ec9efab469a 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -27,10 +27,11 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator}
+import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator, Pair => HPair}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
+
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
@@ -147,15 +148,17 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
override def mergeBootstrapReaders(skeletonFileIterator:
ClosableIterator[InternalRow],
skeletonRequiredSchema: Schema,
dataFileIterator:
ClosableIterator[InternalRow],
- dataRequiredSchema: Schema):
ClosableIterator[InternalRow] = {
+ dataRequiredSchema: Schema,
+ partitionFieldAndValues:
java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
skeletonRequiredSchema,
- dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
+ dataFileIterator.asInstanceOf[ClosableIterator[Any]],
dataRequiredSchema, partitionFieldAndValues)
}
private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
skeletonRequiredSchema: Schema,
dataFileIterator: ClosableIterator[Any],
- dataRequiredSchema: Schema):
ClosableIterator[InternalRow] = {
+ dataRequiredSchema: Schema,
+ partitionFieldAndValues:
java.util.List[HPair[String, Object]]): ClosableIterator[InternalRow] = {
if (supportsParquetRowIndex()) {
assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME))
assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME))
@@ -167,10 +170,10 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
//If we need to do position based merging with log files we will leave
the row index column at the end
val dataProjection = if (getHasLogFiles &&
getShouldMergeUseRecordPosition) {
- getIdentityProjection
+ getBootstrapProjection(dataRequiredSchema, dataRequiredSchema,
partitionFieldAndValues)
} else {
- projectRecord(dataRequiredSchema,
- HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn))
+ getBootstrapProjection(dataRequiredSchema,
+ HoodieAvroUtils.removeFields(dataRequiredSchema, rowIndexColumn),
partitionFieldAndValues)
}
//row index will always be the last column
@@ -224,6 +227,7 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
}
}
} else {
+ val dataProjection = getBootstrapProjection(dataRequiredSchema,
dataRequiredSchema, partitionFieldAndValues)
new ClosableIterator[Any] {
val combinedRow = new JoinedRow()
@@ -251,7 +255,7 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
sparkAdapter.makeColumnarBatch(vecs, s.numRows())
case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
- case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+ case (s: InternalRow, d: InternalRow) => combinedRow(s,
dataProjection.apply(d))
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index 519519fea37..fda5f4b0a2b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -44,13 +44,14 @@ import scala.collection.mutable.ArrayBuffer
object HoodieInternalRowUtils {
private type RenamedColumnMap = JMap[String, String]
+ private type UpdatedColumnMap = JMap[Integer, Object]
private type UnsafeRowWriter = InternalRow => UnsafeRow
// NOTE: [[UnsafeProjection]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
- private val unsafeWriterThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]] =
- ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType, RenamedColumnMap), UnsafeRowWriter]] {
- override def get(): mutable.HashMap[(StructType, StructType,
RenamedColumnMap), UnsafeRowWriter] =
- new mutable.HashMap[(StructType, StructType, RenamedColumnMap),
UnsafeRowWriter]
+ private val unsafeWriterThreadLocal:
ThreadLocal[mutable.HashMap[(StructType, StructType, RenamedColumnMap,
UpdatedColumnMap), UnsafeRowWriter]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[(StructType,
StructType, RenamedColumnMap, UpdatedColumnMap), UnsafeRowWriter]] {
+ override def get(): mutable.HashMap[(StructType, StructType,
RenamedColumnMap, UpdatedColumnMap), UnsafeRowWriter] =
+ new mutable.HashMap[(StructType, StructType, RenamedColumnMap,
UpdatedColumnMap), UnsafeRowWriter]
})
// NOTE: [[UnsafeRowWriter]] objects cache have to stay [[ThreadLocal]]
since these are not thread-safe
@@ -102,9 +103,11 @@ object HoodieInternalRowUtils {
* <li>Handling (field) renames</li>
* </ul>
*/
- def getCachedUnsafeRowWriter(from: StructType, to: StructType,
renamedColumnsMap: JMap[String, String] = JCollections.emptyMap()):
UnsafeRowWriter = {
+ def getCachedUnsafeRowWriter(from: StructType, to: StructType,
+ renamedColumnsMap: JMap[String, String] =
JCollections.emptyMap(),
+ updatedValuesMap: JMap[Integer, Object] =
JCollections.emptyMap()): UnsafeRowWriter = {
unsafeWriterThreadLocal.get()
- .getOrElseUpdate((from, to, renamedColumnsMap), genUnsafeRowWriter(from,
to, renamedColumnsMap))
+ .getOrElseUpdate((from, to, renamedColumnsMap, updatedValuesMap),
genUnsafeRowWriter(from, to, renamedColumnsMap, updatedValuesMap))
}
def getCachedPosList(structType: StructType, field: String):
Option[NestedFieldPath] = {
@@ -142,9 +145,18 @@ object HoodieInternalRowUtils {
private[sql] def genUnsafeRowWriter(prevSchema: StructType,
newSchema: StructType,
- renamedColumnsMap: JMap[String,
String]): UnsafeRowWriter = {
- val writer = newWriterRenaming(prevSchema, newSchema, renamedColumnsMap,
new JArrayDeque[String]())
+ renamedColumnsMap: JMap[String, String],
+ updatedValuesMap: JMap[Integer,
Object]): UnsafeRowWriter = {
val unsafeProjection = generateUnsafeProjection(newSchema, newSchema)
+ if (prevSchema.equals(newSchema) && renamedColumnsMap.isEmpty &&
updatedValuesMap.isEmpty) {
+ return oldRow => unsafeProjection(oldRow)
+ }
+ val writer = if (newSchema.equals(prevSchema)) {
+ // Force a modifiable row to be generated so the updated values can be
set
+ modifiableRowWriter(prevSchema, newSchema, renamedColumnsMap, new
JArrayDeque[String]())
+ } else {
+ newWriterRenaming(prevSchema, newSchema, renamedColumnsMap, new
JArrayDeque[String]())
+ }
val phonyUpdater = new CatalystDataUpdater {
var value: InternalRow = _
@@ -154,6 +166,11 @@ object HoodieInternalRowUtils {
oldRow => {
writer(phonyUpdater, 0, oldRow)
+ updatedValuesMap.forEach((index, value) => {
+ if (phonyUpdater.value.isNullAt(index)) {
+ phonyUpdater.value.update(index, value)
+ }
+ })
unsafeProjection(phonyUpdater.value)
}
}
@@ -218,6 +235,26 @@ object HoodieInternalRowUtils {
}
}
+ private def modifiableRowWriter(prevStructType: StructType,
+ newStructType: StructType,
+ renamedColumnsMap: JMap[String, String],
+ fieldNameStack: JDeque[String]):
RowFieldUpdater = {
+ val writer = genUnsafeStructWriter(prevStructType, newStructType,
renamedColumnsMap, fieldNameStack)
+
+ val newRow = new SpecificInternalRow(newStructType.fields.map(_.dataType))
+ val rowUpdater = new RowUpdater(newRow)
+
+ (fieldUpdater, ordinal, value) => {
+ // Here new row is built in 2 stages:
+ // - First, we pass mutable row (used as buffer/scratchpad) created
above wrapped into [[RowUpdater]]
+ // into generated row-writer
+ // - Upon returning from row-writer, we call back into parent row's
[[fieldUpdater]] to set returned
+ // row as a value in it
+ writer(rowUpdater, value)
+ fieldUpdater.set(ordinal, newRow)
+ }
+ }
+
private def newWriterRenaming(prevDataType: DataType,
newDataType: DataType,
renamedColumnsMap: JMap[String, String],
@@ -227,20 +264,7 @@ object HoodieInternalRowUtils {
(fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
case (newStructType: StructType, prevStructType: StructType) =>
- val writer = genUnsafeStructWriter(prevStructType, newStructType,
renamedColumnsMap, fieldNameStack)
-
- val newRow = new
SpecificInternalRow(newStructType.fields.map(_.dataType))
- val rowUpdater = new RowUpdater(newRow)
-
- (fieldUpdater, ordinal, value) => {
- // Here new row is built in 2 stages:
- // - First, we pass mutable row (used as buffer/scratchpad)
created above wrapped into [[RowUpdater]]
- // into generated row-writer
- // - Upon returning from row-writer, we call back into parent
row's [[fieldUpdater]] to set returned
- // row as a value in it
- writer(rowUpdater, value)
- fieldUpdater.set(ordinal, newRow)
- }
+ modifiableRowWriter(prevStructType, newStructType, renamedColumnsMap,
fieldNameStack)
case (ArrayType(newElementType, _), ArrayType(prevElementType,
containsNull)) =>
fieldNameStack.push("element")
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 9ede9fc1239..d2f360e5ae1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
@@ -148,8 +149,9 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
public ClosableIterator<IndexedRecord>
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator,
Schema
skeletonRequiredSchema,
ClosableIterator<IndexedRecord> dataFileIterator,
- Schema
dataRequiredSchema) {
- return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema,
dataFileIterator, dataRequiredSchema);
+ Schema
dataRequiredSchema,
+
List<Pair<String, Object>> partitionFieldAndValues) {
+ return new BootstrapIterator(skeletonFileIterator, skeletonRequiredSchema,
dataFileIterator, dataRequiredSchema, partitionFieldAndValues);
}
@Override
@@ -227,15 +229,20 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
private final Schema dataRequiredSchema;
private final Schema mergedSchema;
private final int skeletonFields;
+ private final int[] partitionFieldPositions;
+ private final Object[] partitionValues;
public BootstrapIterator(ClosableIterator<IndexedRecord>
skeletonFileIterator, Schema skeletonRequiredSchema,
- ClosableIterator<IndexedRecord> dataFileIterator,
Schema dataRequiredSchema) {
+ ClosableIterator<IndexedRecord> dataFileIterator,
Schema dataRequiredSchema,
+ List<Pair<String, Object>>
partitionFieldAndValues) {
this.skeletonFileIterator = skeletonFileIterator;
this.skeletonRequiredSchema = skeletonRequiredSchema;
this.dataFileIterator = dataFileIterator;
this.dataRequiredSchema = dataRequiredSchema;
this.mergedSchema = AvroSchemaUtils.mergeSchemas(skeletonRequiredSchema,
dataRequiredSchema);
this.skeletonFields = skeletonRequiredSchema.getFields().size();
+ this.partitionFieldPositions =
partitionFieldAndValues.stream().map(Pair::getLeft).map(field ->
mergedSchema.getField(field).pos()).mapToInt(Integer::intValue).toArray();
+ this.partitionValues =
partitionFieldAndValues.stream().map(Pair::getValue).toArray();
}
@Override
@@ -265,6 +272,11 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
Schema.Field sourceField =
dataRecord.getSchema().getField(dataField.name());
mergedRecord.put(dataField.pos() + skeletonFields,
dataRecord.get(sourceField.pos()));
}
+ for (int i = 0; i < partitionFieldPositions.length; i++) {
+ if (mergedRecord.get(partitionFieldPositions[i]) == null) {
+ mergedRecord.put(partitionFieldPositions[i], partitionValues[i]);
+ }
+ }
return mergedRecord;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 98d6feeb0e7..c07b9b87e0c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,12 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
@@ -42,6 +43,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.UnaryOperator;
@@ -321,13 +323,17 @@ public abstract class HoodieReaderContext<T> {
* skeleton file iterator, followed by all columns in the data file iterator
*
* @param skeletonFileIterator iterator over bootstrap skeleton files that
contain hudi metadata columns
- * @param dataFileIterator iterator over data files that were
bootstrapped into the hudi table
+ * @param skeletonRequiredSchema the schema of the skeleton file iterator
+ * @param dataFileIterator iterator over data files that were bootstrapped
into the hudi table
+ * @param dataRequiredSchema the schema of the data file iterator
+ * @param requiredPartitionFieldAndValues the partition field names and
their values that are required by the query
* @return iterator that concatenates the skeletonFileIterator and
dataFileIterator
*/
public abstract ClosableIterator<T>
mergeBootstrapReaders(ClosableIterator<T> skeletonFileIterator,
Schema
skeletonRequiredSchema,
ClosableIterator<T> dataFileIterator,
- Schema
dataRequiredSchema);
+ Schema
dataRequiredSchema,
+ List<Pair<String,
Object>> requiredPartitionFieldAndValues);
/**
* Creates a function that will reorder records of schema "from" to schema
of "to"
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
new file mode 100644
index 00000000000..3b846330e63
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/PartitionPathParser.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
+
+public class PartitionPathParser {
+ public static final String DEPRECATED_DEFAULT_PARTITION_PATH = "default";
+ public static final String DEFAULT_PARTITION_PATH =
"__HIVE_DEFAULT_PARTITION__";
+ private static final String EQUALS_SIGN = "=";
+ private static final String DASH = "-";
+ private static final String SLASH = "/";
+
+ public Object[] getPartitionFieldVals(Option<String[]> partitionFields,
+ String partitionPath,
+ Schema writerSchema) {
+ if (!partitionFields.isPresent()) {
+ return new Object[0];
+ }
+ return getPartitionValues(partitionFields.get(), partitionPath,
writerSchema);
+ }
+
+ private static Object[] getPartitionValues(String[] partitionFields,
+ String partitionPath,
+ Schema schema) {
+ String[] parts = partitionPath.split("/");
+ int pathSegment = 0;
+ boolean hasDateField = false;
+ Object[] partitionValues = new Object[partitionFields.length];
+ for (int i = 0; i < partitionFields.length; i++) {
+ String partitionField = partitionFields[i];
+ Schema.Field field = schema.getField(partitionField);
+ // if the field is not present in the schema, we assume it is a string
+ Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) :
resolveNullableSchema(field.schema());
+ LogicalType logicalType = fieldSchema.getLogicalType();
+ if (isTimeBasedLogicalType(logicalType)) {
+ if (hasDateField) {
+ throw new IllegalArgumentException("Only one date field based
partition is supported");
+ }
+ hasDateField = true;
+ int numDateDirs = parts.length - partitionFields.length + 1;
+ partitionValues[i] = inferDateValue(partitionPath, parts, pathSegment,
numDateDirs, fieldSchema);
+ pathSegment += numDateDirs;
+ } else {
+ String segment = parts[pathSegment];
+ String[] segmentParts = segment.split(EQUALS_SIGN);
+ partitionValues[i] = parseValue(segmentParts[segmentParts.length - 1],
fieldSchema);
+ pathSegment++;
+ }
+ }
+ return partitionValues;
+ }
+
+ @VisibleForTesting
+ static Object parseValue(String partitionValue, Schema fieldSchema) {
+ if (partitionValue.equals(DEFAULT_PARTITION_PATH) ||
partitionValue.equals(DEPRECATED_DEFAULT_PARTITION_PATH)) {
+ return null;
+ }
+
+ switch (fieldSchema.getType()) {
+ case STRING:
+ return PartitionPathEncodeUtils.unescapePathName(partitionValue);
+ case INT:
+ return Integer.parseInt(partitionValue);
+ case LONG:
+ return Long.parseLong(partitionValue);
+ case FLOAT:
+ return Float.parseFloat(partitionValue);
+ case DOUBLE:
+ return Double.parseDouble(partitionValue);
+ case BOOLEAN:
+ return Boolean.parseBoolean(partitionValue);
+ case BYTES:
+ case FIXED:
+ if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+ return new java.math.BigDecimal(partitionValue);
+ } else {
+ return partitionValue.getBytes(StandardCharsets.UTF_8);
+ }
+ default:
+ throw new IllegalArgumentException("Unexpected type " +
fieldSchema.getType());
+ }
+ }
+
+ private static Object inferDateValue(
+ String partitionPath,
+ String[] parts,
+ int pathSegment,
+ int numDateDirs,
+ Schema fieldSchema) {
+ StringBuilder condensedPartitionValue = new StringBuilder();
+ for (int i = 0; i < numDateDirs; i++) {
+ String partitionValue = parts[pathSegment + i];
+ // remove the field name if it is present due to hive-style partitioning
+ if (partitionValue.contains(EQUALS_SIGN)) {
+ partitionValue = partitionValue.split(EQUALS_SIGN)[1];
+ }
+ if (partitionValue.contains(DASH)) {
+ partitionValue = partitionValue.replace(DASH, "");
+ }
+ condensedPartitionValue.append(partitionValue.replace(SLASH, ""));
+ }
+ LocalDateTime time;
+ switch (condensedPartitionValue.length()) {
+ case 4: // Year
+ time =
LocalDateTime.of(Integer.parseInt(condensedPartitionValue.substring(0, 4)), 1,
1, 0, 0);
+ break;
+ case 6: // Month
+ time = LocalDateTime.of(
+ Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+ Integer.parseInt(condensedPartitionValue.substring(4, 6)), 1, 0,
0);
+ break;
+ case 8: // Day
+ time = LocalDateTime.of(
+ Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+ Integer.parseInt(condensedPartitionValue.substring(4, 6)),
+ Integer.parseInt(condensedPartitionValue.substring(6, 8)), 0, 0);
+ break;
+ case 10: // Hour
+ time = LocalDateTime.of(
+ Integer.parseInt(condensedPartitionValue.substring(0, 4)),
+ Integer.parseInt(condensedPartitionValue.substring(4, 6)),
+ Integer.parseInt(condensedPartitionValue.substring(6, 8)),
+ Integer.parseInt(condensedPartitionValue.substring(8, 10)), 0);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown date format for partition path: " + partitionPath);
+ }
+ if (fieldSchema.getLogicalType() instanceof LogicalTypes.Date) {
+ return Date.valueOf(time.toLocalDate());
+ }
+ return Timestamp.from(time.toInstant(ZoneOffset.UTC));
+ }
+
+ private static boolean isTimeBasedLogicalType(LogicalType logicalType) {
+ return logicalType instanceof LogicalTypes.Date
+ || logicalType instanceof LogicalTypes.TimestampMillis
+ || logicalType instanceof LogicalTypes.TimestampMicros
+ || logicalType instanceof LogicalTypes.TimeMillis
+ || logicalType instanceof LogicalTypes.TimeMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMicros
+ || logicalType instanceof LogicalTypes.LocalTimestampMillis;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 570d9580ca0..0c972c936f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.PartitionPathParser;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -49,6 +50,8 @@ import org.apache.avro.Schema;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@@ -71,6 +74,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private final HoodieReaderContext<T> readerContext;
private final Option<HoodieBaseFile> hoodieBaseFileOption;
private final List<HoodieLogFile> logFiles;
+ private final String partitionPath;
+ private final Option<String[]> partitionPathFields;
private final HoodieStorage storage;
private final TypedProperties props;
// Byte offset to start reading from the base file
@@ -112,6 +117,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.start = start;
this.length = length;
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
+ this.partitionPath = fileSlice.getPartitionPath();
+ this.partitionPathFields = tableConfig.getPartitionFields();
RecordMergeMode recordMergeMode = tableConfig.getRecordMergeMode();
String mergeStrategyId = tableConfig.getRecordMergeStrategyId();
if
(!tableConfig.getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
@@ -228,8 +235,22 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
if (start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing
bootstrap merge but we are not reading from the start of the base file");
}
+ PartitionPathParser partitionPathParser = new PartitionPathParser();
+ Object[] partitionValues =
partitionPathParser.getPartitionFieldVals(partitionPathFields, partitionPath,
readerContext.getSchemaHandler().getTableSchema());
+ // filter out the partition values that are not required by the data
schema
+ List<Pair<String, Object>> partitionPathFieldsAndValues =
partitionPathFields.map(partitionFields -> {
+ Schema dataSchema = dataFileIterator.get().getRight();
+ List<Pair<String, Object>> filterFieldsAndValues = new
ArrayList<>(partitionFields.length);
+ for (int i = 0; i < partitionFields.length; i++) {
+ String field = partitionFields[i];
+ if (dataSchema.getField(field) != null) {
+ filterFieldsAndValues.add(Pair.of(field,
readerContext.convertValueToEngineType((Comparable) partitionValues[i])));
+ }
+ }
+ return filterFieldsAndValues;
+ }).orElseGet(Collections::emptyList);
return
readerContext.mergeBootstrapReaders(skeletonFileIterator.get().getLeft(),
skeletonFileIterator.get().getRight(),
- dataFileIterator.get().getLeft(), dataFileIterator.get().getRight());
+ dataFileIterator.get().getLeft(), dataFileIterator.get().getRight(),
partitionPathFieldsAndValues);
}
}
@@ -253,7 +274,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return
Option.of(Pair.of(readerContext.getFileRecordIterator(fileStoragePathInfo, 0,
file.getFileLen(),
readerContext.getSchemaHandler().createSchemaFromFields(allFields),
requiredSchema, storage), requiredSchema));
} else {
- return
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0,
file.getFileLen(),
+ // If the base file length passed in is invalid, i.e., -1,
+ // the file group reader fetches the length from the file system
+ long fileLength = file.getFileLen() >= 0
+ ? file.getFileLen() :
storage.getPathInfo(file.getStoragePath()).getLength();
+ return
Option.of(Pair.of(readerContext.getFileRecordIterator(file.getStoragePath(), 0,
fileLength,
readerContext.getSchemaHandler().createSchemaFromFields(allFields),
requiredSchema, storage), requiredSchema));
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
index bcf9fbf5d61..7d48efe594b 100644
---
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
+++
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroReaderContext.java
@@ -79,7 +79,7 @@ class TestHoodieAvroReaderContext {
ClosableIterator<IndexedRecord> baseIterator =
ClosableIterator.wrap(Arrays.asList(baseRecord1, baseRecord2,
baseRecord3).iterator());
List<IndexedRecord> actual = new ArrayList<>();
- try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator,
LIMITED_SKELETON_SCHEMA, baseIterator, LIMITED_BASE_SCHEMA)) {
+ try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator,
LIMITED_SKELETON_SCHEMA, baseIterator, LIMITED_BASE_SCHEMA,
Collections.emptyList())) {
iter.forEachRemaining(actual::add);
}
assertEquals(Arrays.asList(expectedRecord1, expectedRecord2,
expectedRecord3), actual);
@@ -105,7 +105,7 @@ class TestHoodieAvroReaderContext {
ClosableIterator<IndexedRecord> baseIterator =
ClosableIterator.wrap(Arrays.asList(baseRecord1, baseRecord2,
baseRecord3).iterator());
List<IndexedRecord> actual = new ArrayList<>();
- try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA,
baseIterator, BASE_SCHEMA)) {
+ try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA,
baseIterator, BASE_SCHEMA, Collections.emptyList())) {
iter.forEachRemaining(actual::add);
}
assertEquals(Arrays.asList(expectedRecord1, expectedRecord2,
expectedRecord3), actual);
@@ -116,7 +116,7 @@ class TestHoodieAvroReaderContext {
HoodieAvroReaderContext avroReaderContext =
getReaderContextWithMetaFields();
List<IndexedRecord> actual = new ArrayList<>();
try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(ClosableIterator.wrap(Collections.emptyIterator()),
SKELETON_SCHEMA,
- ClosableIterator.wrap(Collections.emptyIterator()), BASE_SCHEMA)) {
+ ClosableIterator.wrap(Collections.emptyIterator()), BASE_SCHEMA,
Collections.emptyList())) {
iter.forEachRemaining(actual::add);
}
assertEquals(Collections.emptyList(), actual);
@@ -136,7 +136,7 @@ class TestHoodieAvroReaderContext {
List<IndexedRecord> actual = new ArrayList<>();
assertThrows(IllegalStateException.class, () -> {
- try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA,
baseIterator, BASE_SCHEMA)) {
+ try (ClosableIterator<IndexedRecord> iter =
avroReaderContext.mergeBootstrapReaders(skeletonIterator, SKELETON_SCHEMA,
baseIterator, BASE_SCHEMA, Collections.emptyList())) {
iter.forEachRemaining(actual::add);
}
});
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
new file mode 100644
index 00000000000..e65f4499291
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestPartitionPathParser.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestPartitionPathParser {
+
+ private static Stream<Arguments> partitionPathCases() {
+ return Stream.of(
+ Arguments.of("2025/01/03/22", new String[]{"timestamp_field"}, new
Object[]{new Timestamp(1735941600000L)}),
+ Arguments.of("2025-01-03-22", new String[]{"timestamp_field"}, new
Object[]{new Timestamp(1735941600000L)}),
+ Arguments.of("timestamp_field=2025-01-03-22", new
String[]{"timestamp_field"}, new Object[]{new Timestamp(1735941600000L)}),
+ Arguments.of("2025/01/03", new String[]{"date_field"}, new
Object[]{Date.valueOf("2025-01-03")}),
+ Arguments.of("2025/01", new String[]{"date_field"}, new
Object[]{Date.valueOf("2025-01-01")}),
+ Arguments.of("2025", new String[]{"date_field"}, new
Object[]{Date.valueOf("2025-01-01")}),
+ Arguments.of("value1/2025/01/03", new
String[]{"string_field","date_field"}, new Object[]{"value1",
Date.valueOf("2025-01-03")}),
+ Arguments.of("2025/01/03/value1", new String[]{"date_field",
"string_field"}, new Object[]{Date.valueOf("2025-01-03"), "value1"}),
+ Arguments.of("string_field=value1/year=2020/month=08/day=28/hour=06",
new String[]{"string_field", "timestamp_field"}, new Object[]{"value1", new
Timestamp(1598594400000L)}),
+ Arguments.of("year=2020/month=08/day=28/hour=06/string_field=value1",
new String[]{"timestamp_field", "string_field"}, new Object[]{new
Timestamp(1598594400000L), "value1"}),
+ Arguments.of("", null, new Object[]{})
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionPathCases")
+ void testGetPartitionFieldVals(String partitionPath, String[]
partitionFields, Object[] expectedValues) {
+ PartitionPathParser parser = new PartitionPathParser();
+ Schema schema = new
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"TestRecord\",\"fields\":[{\"name\":\"string_field\",\"type\":[\"null\",
\"string\"]},"
+ + "{\"name\":\"date_field\",\"type\":
{\"type\":\"int\",\"logicalType\":
\"date\"}},{\"name\":\"timestamp_field\",\"type\":
{\"type\":\"long\",\"logicalType\": \"timestamp-millis\"}}]}");
+
+ Object[] result =
parser.getPartitionFieldVals(Option.ofNullable(partitionFields), partitionPath,
schema);
+ assertEquals(expectedValues.length, result.length);
+ for (int i = 0; i < expectedValues.length; i++) {
+ assertEquals(expectedValues[i], result[i]);
+ }
+ }
+
+ private static Stream<Arguments> fieldCases() {
+ return Stream.of(
+ Arguments.of("123", Schema.create(Schema.Type.LONG), 123L),
+ Arguments.of("123", Schema.create(Schema.Type.INT), 123),
+ Arguments.of("123.45", Schema.create(Schema.Type.DOUBLE), 123.45),
+ Arguments.of("123.45", Schema.create(Schema.Type.FLOAT), 123.45f),
+ Arguments.of("false", Schema.create(Schema.Type.BOOLEAN), false),
+ Arguments.of("__HIVE_DEFAULT_PARTITION__",
Schema.create(Schema.Type.INT), null),
+ Arguments.of("default", Schema.create(Schema.Type.INT), null),
+ Arguments.of("2025-01-03", Schema.create(Schema.Type.STRING),
"2025-01-03"),
+ Arguments.of("value1", Schema.create(Schema.Type.BYTES),
"value1".getBytes(StandardCharsets.UTF_8)),
+ Arguments.of("value1", Schema.createFixed("fixed", "docs", null, 50),
"value1".getBytes(StandardCharsets.UTF_8))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("fieldCases")
+ void testValueParsing(String value, Schema fieldSchema, Object expected) {
+ if (expected instanceof byte[]) {
+ String expectedString = new String((byte[]) expected,
StandardCharsets.UTF_8);
+ String actualString = new String((byte[])
PartitionPathParser.parseValue(value, fieldSchema));
+ assertEquals(expectedString, actualString);
+ } else {
+ assertEquals(expected, PartitionPathParser.parseValue(value,
fieldSchema));
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4e268d1bb0a..6135b21e950 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -29,7 +29,10 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -37,7 +40,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
-import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.RawTripTestPayload;
@@ -51,17 +54,24 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StorageConfiguration;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
+import java.net.URI;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -69,6 +79,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
@@ -129,7 +141,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
commitToTable(initialRecords, INSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
0, recordMergeMode,
+ getStorageConf(), getBasePath(), true, 0, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing a base file and a log
file
@@ -138,7 +150,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> unmergedRecords =
CollectionUtils.combine(initialRecords, updates);
commitToTable(updates, UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
1, recordMergeMode,
+ getStorageConf(), getBasePath(), true, 1, recordMergeMode,
allRecords, unmergedRecords);
// Three commits; reading one file group containing a base file and two
log files
@@ -146,7 +158,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> finalRecords = mergeRecordLists(updates2, allRecords);
commitToTable(updates2, UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
2, recordMergeMode,
+ getStorageConf(), getBasePath(), true, 2, recordMergeMode,
finalRecords, CollectionUtils.combine(unmergedRecords, updates2));
}
}
@@ -171,7 +183,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> initialRecords = dataGen.generateInserts("001", 100);
commitToTable(initialRecords, INSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
1, recordMergeMode,
+ getStorageConf(), getBasePath(), false, 1, recordMergeMode,
initialRecords, initialRecords);
// Two commits; reading one file group containing two log files
@@ -179,11 +191,27 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord> allRecords = mergeRecordLists(updates,
initialRecords);
commitToTable(updates, UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
2, recordMergeMode,
+ getStorageConf(), getBasePath(), false, 2, recordMergeMode,
allRecords, CollectionUtils.combine(initialRecords, updates));
}
}
+ @Test
+ public void testReadFileGroupInBootstrapMergeOnReadTable() throws Exception {
+ Path zipOutput = Paths.get(new URI(getBasePath()));
+ extract(zipOutput);
+ ObjectMapper objectMapper = new ObjectMapper();
+ Path basePath = zipOutput.resolve("bootstrap_data");
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = new
ArrayList<>();
+
objectMapper.reader().forType(HoodieTestDataGenerator.RecordIdentifier.class).<HoodieTestDataGenerator.RecordIdentifier>readValues(basePath.resolve("merged_records.json").toFile())
+ .forEachRemaining(expectedRecords::add);
+ List<HoodieTestDataGenerator.RecordIdentifier> expectedUnMergedRecords =
new ArrayList<>();
+
objectMapper.reader().forType(HoodieTestDataGenerator.RecordIdentifier.class).<HoodieTestDataGenerator.RecordIdentifier>readValues(basePath.resolve("unmerged_records.json").toFile())
+ .forEachRemaining(expectedUnMergedRecords::add);
+ validateOutputFromFileGroupReaderWithExistingRecords(getStorageConf(),
basePath.toString(), true, 1, RecordMergeMode.EVENT_TIME_ORDERING,
+ expectedRecords, expectedUnMergedRecords);
+ }
+
@ParameterizedTest
@EnumSource(value = ExternalSpillableMap.DiskMapType.class)
public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType
diskMapType) throws Exception {
@@ -193,7 +221,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
String baseMapPath = Files.createTempDirectory(null).toString();
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
- List<FileSlice> fileSlices = getFileSlicesToRead(getStorageConf(),
getBasePath(), metaClient, dataGen.getPartitionPaths(), true, 0);
+ List<FileSlice> fileSlices = getFileSlicesToRead(getStorageConf(),
getBasePath(), metaClient, true, 0);
List<T> records = readRecordsFromFileGroup(getStorageConf(),
getBasePath(), metaClient, fileSlices,
avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
HoodieReaderContext<T> readerContext =
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
@@ -257,7 +285,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
private void validateOutputFromFileGroupReader(StorageConfiguration<?>
storageConf,
String tablePath,
- String[] partitionPaths,
boolean containsBaseFile,
int expectedLogFileNum,
RecordMergeMode
recordMergeMode,
@@ -265,11 +292,25 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<HoodieRecord>
expectedHoodieUnmergedRecords) throws Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
- // use reader context for conversion to engine specific objects
- HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath,
avroSchema, getStorageConf(), metaClient);
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords =
convertHoodieRecords(expectedHoodieRecords, avroSchema);
List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords =
convertHoodieRecords(expectedHoodieUnmergedRecords, avroSchema);
- List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath,
metaClient, partitionPaths, containsBaseFile, expectedLogFileNum);
+ validateOutputFromFileGroupReaderWithExistingRecords(
+ storageConf, tablePath, containsBaseFile, expectedLogFileNum,
recordMergeMode,
+ expectedRecords, expectedUnmergedRecords);
+ }
+
+ private void
validateOutputFromFileGroupReaderWithExistingRecords(StorageConfiguration<?>
storageConf,
+ String
tablePath,
+ boolean
containsBaseFile,
+ int
expectedLogFileNum,
+
RecordMergeMode recordMergeMode,
+
List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords,
+
List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords) throws
Exception {
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
+ Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ // use reader context for conversion to engine specific objects
+ HoodieReaderContext<T> readerContext = getHoodieReaderContext(tablePath,
avroSchema, getStorageConf(), metaClient);
+ List<FileSlice> fileSlices = getFileSlicesToRead(storageConf, tablePath,
metaClient, containsBaseFile, expectedLogFileNum);
List<HoodieTestDataGenerator.RecordIdentifier> actualRecordList =
convertEngineRecords(
readRecordsFromFileGroup(storageConf, tablePath, metaClient,
fileSlices, avroSchema, recordMergeMode, false),
avroSchema, readerContext);
@@ -284,11 +325,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
private List<FileSlice> getFileSlicesToRead(StorageConfiguration<?>
storageConf,
- String tablePath,
- HoodieTableMetaClient metaClient,
- String[] partitionPaths,
- boolean containsBaseFile,
- int expectedLogFileNum) {
+ String tablePath,
+ HoodieTableMetaClient metaClient,
+ boolean containsBaseFile,
+ int expectedLogFileNum) {
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(storageConf);
HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(
@@ -298,9 +338,23 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
HoodieCommonConfig.newBuilder().build(),
mc -> HoodieTableMetadata.create(
engineContext, mc.getStorage(), metadataConfig, tablePath));
- SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
- List<FileSlice> fileSlices =
Arrays.stream(partitionPaths).flatMap(fsView::getAllFileSlices).collect(Collectors.toList());
+ HoodieTableFileSystemView fsView =
+ (HoodieTableFileSystemView) viewManager.getFileSystemView(metaClient);
+ List<String> relativePartitionPathList = FSUtils.getAllPartitionPaths(
+ engineContext, metaClient.getStorage(),
+ metadataConfig, metaClient.getBasePath().toString());
+ List<FileSlice> fileSlices =
+ relativePartitionPathList.stream().flatMap(fsView::getAllFileSlices)
+ .collect(Collectors.toList());
fileSlices.forEach(fileSlice -> {
+ if (fileSlice.hasBootstrapBase()) {
+ // bootstrap file points to an absolute path
+ // Since the dataset is copied to a new tempDir for testing, we need
to manipulate this path
+ HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ String bootstrapPath = baseFile.getBootstrapBaseFile().get().getPath();
+ String newBootstrapPath = tablePath + "/" +
bootstrapPath.substring(bootstrapPath.indexOf("bootstrap_table"));
+ baseFile.setBootstrapBaseFile(new BaseFile(newBootstrapPath));
+ }
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
assertEquals(expectedLogFileNum, logFilePathList.size());
assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
@@ -413,4 +467,26 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
readerContext.getValue(record, schema,
RIDER_FIELD_NAME).toString()))
.collect(Collectors.toList());
}
+
+ private void extract(Path target) throws IOException {
+ try (ZipInputStream zip = new
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
{
+ ZipEntry entry;
+
+ while ((entry = zip.getNextEntry()) != null) {
+ File file = target.resolve(entry.getName()).toFile();
+ if (entry.isDirectory()) {
+ file.mkdirs();
+ continue;
+ }
+ byte[] buffer = new byte[10000];
+ file.getParentFile().mkdirs();
+ try (BufferedOutputStream out = new
BufferedOutputStream(Files.newOutputStream(file.toPath()))) {
+ int count;
+ while ((count = zip.read(buffer)) != -1) {
+ out.write(buffer, 0, count);
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 55e52855bca..3549a0ba3e8 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -39,6 +39,8 @@ import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
@@ -1219,7 +1221,11 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
private final String partitionPath;
private final String riderValue;
- public RecordIdentifier(String recordKey, String partitionPath, String
orderingVal, String riderValue) {
+ @JsonCreator
+ public RecordIdentifier(@JsonProperty("recordKey") String recordKey,
+ @JsonProperty("partitionPath") String
partitionPath,
+ @JsonProperty("orderingVal") String orderingVal,
+ @JsonProperty("riderValue") String riderValue) {
this.recordKey = recordKey;
this.orderingVal = orderingVal;
this.partitionPath = partitionPath;
@@ -1254,5 +1260,21 @@ Generate random record using TRIP_ENCODED_DECIMAL_SCHEMA
public int hashCode() {
return Objects.hash(recordKey, orderingVal, partitionPath, riderValue);
}
+
+ public String getRecordKey() {
+ return recordKey;
+ }
+
+ public String getOrderingVal() {
+ return orderingVal;
+ }
+
+ public String getPartitionPath() {
+ return partitionPath;
+ }
+
+ public String getRiderValue() {
+ return riderValue;
+ }
}
}
diff --git
a/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip
b/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip
new file mode 100644
index 00000000000..2e6758c168b
Binary files /dev/null and
b/hudi-common/src/test/resources/file-group-reader/bootstrap_data.zip differ
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 87138797958..93878c35d25 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.client.model.BootstrapRowData;
import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.HoodieFlinkRecord;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
@@ -189,7 +191,12 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
ClosableIterator<RowData> skeletonFileIterator,
Schema skeletonRequiredSchema,
ClosableIterator<RowData> dataFileIterator,
- Schema dataRequiredSchema) {
+ Schema dataRequiredSchema,
+ List<Pair<String, Object>> partitionFieldAndValues) {
+ Map<Integer, Object> partitionOrdinalToValues =
partitionFieldAndValues.stream()
+ .collect(Collectors.toMap(
+ pair -> dataRequiredSchema.getField(pair.getKey()).pos(),
+ Pair::getValue));
return new ClosableIterator<RowData>() {
final JoinedRowData joinedRow = new JoinedRowData();
@Override
@@ -204,12 +211,19 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
@Override
public RowData next() {
RowData skeletonRow = skeletonFileIterator.next();
- RowData dataRow = dataFileIterator.next();
+ RowData dataRow = appendPartitionFields(dataFileIterator.next());
joinedRow.setRowKind(dataRow.getRowKind());
joinedRow.replace(skeletonRow, dataRow);
return joinedRow;
}
+ private RowData appendPartitionFields(RowData dataRow) {
+ if (partitionFieldAndValues.isEmpty()) {
+ return dataRow;
+ }
+ return new BootstrapRowData(dataRow, partitionOrdinalToValues);
+ }
+
@Override
public void close() {
skeletonFileIterator.close();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index cfb04082541..a9830b2ca60 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
@@ -241,9 +242,13 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
public ClosableIterator<ArrayWritable>
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
Schema
skeletonRequiredSchema,
ClosableIterator<ArrayWritable> dataFileIterator,
- Schema
dataRequiredSchema) {
+ Schema
dataRequiredSchema,
+
List<Pair<String, Object>> partitionFieldsAndValues) {
int skeletonLen = skeletonRequiredSchema.getFields().size();
int dataLen = dataRequiredSchema.getFields().size();
+ int[] partitionFieldPositions = partitionFieldsAndValues.stream()
+ .map(pair ->
dataRequiredSchema.getField(pair.getKey()).pos()).mapToInt(Integer::intValue).toArray();
+ Writable[] convertedPartitionValues =
partitionFieldsAndValues.stream().map(Pair::getValue).toArray(Writable[]::new);
return new ClosableIterator<ArrayWritable>() {
private final ArrayWritable returnWritable = new
ArrayWritable(Writable.class);
@@ -260,6 +265,11 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
public ArrayWritable next() {
Writable[] skeletonWritable = skeletonFileIterator.next().get();
Writable[] dataWritable = dataFileIterator.next().get();
+ for (int i = 0; i < partitionFieldPositions.length; i++) {
+ if (dataWritable[partitionFieldPositions[i]] == null) {
+ dataWritable[partitionFieldPositions[i]] =
convertedPartitionValues[i];
+ }
+ }
Writable[] mergedWritable = new Writable[skeletonLen + dataLen];
System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen);
System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen,
dataLen);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
index 23d6c0cb511..fd637490db8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -59,7 +59,7 @@ public class ObjectInspectorCache {
// eg: current table is col1, col2, col3;
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3
,BLOCK__OFFSET__INSIDE__FILE ...
Set<String> writerSchemaColNames = tableSchema.getFields().stream().map(f
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
List<String> columnNameList =
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
- List<TypeInfo> columnTypeList =
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+ List<TypeInfo> columnTypeList =
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
int columnNameListLen = columnNameList.size() - 1;
for (int i = columnNameListLen; i >= 0; i--) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
index 3d4e87ef289..d0d045da593 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
@@ -79,13 +79,13 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
val data = sparkSession.sparkContext.parallelize(rows)
val oldRow = sparkSession.createDataFrame(data,
mergedSchema).queryExecution.toRdd.first()
- val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema,
schema1, JCollections.emptyMap())
+ val rowWriter1 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema,
schema1, JCollections.emptyMap(), JCollections.emptyMap())
val newRow1 = rowWriter1(oldRow)
val serDe1 = sparkAdapter.createSparkRowSerDe(schema1)
assertEquals(serDe1.deserializeRow(newRow1), Row("Andrew", 18,
Row("Mission st", "SF")));
- val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema,
schema2, JCollections.emptyMap())
+ val rowWriter2 = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema,
schema2, JCollections.emptyMap(), JCollections.emptyMap())
val newRow2 = rowWriter2(oldRow)
val serDe2 = sparkAdapter.createSparkRowSerDe(schema2)
@@ -95,13 +95,35 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
test("Test simple rewriting (with nullable value)") {
val data = sparkSession.sparkContext.parallelize(Seq(Row("Rob", 18,
null.asInstanceOf[StructType])))
val oldRow = sparkSession.createDataFrame(data,
schema1).queryExecution.toRdd.first()
- val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1,
mergedSchema, JCollections.emptyMap())
+ val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(schema1,
mergedSchema, JCollections.emptyMap(), JCollections.emptyMap())
val newRow = rowWriter(oldRow)
val serDe = sparkAdapter.createSparkRowSerDe(mergedSchema)
assertEquals(serDe.deserializeRow(newRow), Row("Rob", 18,
null.asInstanceOf[StructType], null.asInstanceOf[StringType],
null.asInstanceOf[IntegerType]))
}
+ test("Test rewriting with field value injections") {
+ val rowWithNull = Seq(
+ Row("Andrew", null, Row("Mission st", "SF"), "John", 19)
+ )
+ val oldRow =
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rowWithNull),
mergedSchema).queryExecution.toRdd.first()
+
+ val updatedValuesMap: java.util.Map[Integer, Object] =
JCollections.singletonMap(1, 18).asInstanceOf[java.util.Map[Integer, Object]]
+ val rowWriter = HoodieInternalRowUtils.genUnsafeRowWriter(mergedSchema,
schema1, JCollections.emptyMap(), updatedValuesMap)
+ val newRow1 = rowWriter(oldRow)
+
+ val serDe = sparkAdapter.createSparkRowSerDe(schema1)
+ assertEquals(serDe.deserializeRow(newRow1), Row("Andrew", 18, Row("Mission
st", "SF")));
+
+ // non-nul value should not be rewritten
+ val rowWithoutNull = Seq(
+ Row("Andrew", 25, Row("Mission st", "SF"), "John", 19)
+ )
+ val oldRow2 =
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(rowWithoutNull),
mergedSchema).queryExecution.toRdd.first()
+ val newRow2 = rowWriter(oldRow2)
+ assertEquals(serDe.deserializeRow(newRow2), Row("Andrew", 25, Row("Mission
st", "SF")));
+ }
+
/**
* test record data type changes.
* int => long/float/double/string
@@ -192,7 +214,7 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
val newRowExpected =
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema,
newStructTypeSchema)
.apply(newRecord).get
- val rowWriter =
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema,
newStructTypeSchema, new HashMap[String, String])
+ val rowWriter =
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema,
newStructTypeSchema, JCollections.emptyMap(), JCollections.emptyMap())
val newRow = rowWriter(row)
internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
@@ -247,7 +269,7 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
val row = AvroConversionUtils.createAvroToInternalRowConverter(schema,
structTypeSchema).apply(avroRecord).get
val newRowExpected =
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema,
newStructTypeSchema).apply(newAvroRecord).get
- val rowWriter =
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema,
newStructTypeSchema, new HashMap[String, String])
+ val rowWriter =
HoodieInternalRowUtils.genUnsafeRowWriter(structTypeSchema,
newStructTypeSchema, JCollections.emptyMap(), JCollections.emptyMap())
val newRow = rowWriter(row)
internalRowCompare(newRowExpected, newRow, newStructTypeSchema)
diff --git a/pom.xml b/pom.xml
index 20a4ca00014..a56b62d3890 100644
--- a/pom.xml
+++ b/pom.xml
@@ -713,6 +713,7 @@
<exclude>**/test/resources/*.commit</exclude>
<exclude>**/test/resources/**/*.txt</exclude>
<exclude>**/test/resources/**/*.avsc</exclude>
+ <exclude>**/test/resources/**/.zip</exclude>
<exclude>**/target/**</exclude>
<exclude>**/generated-sources/**</exclude>
<exclude>.github/**</exclude>