This is an automated email from the ASF dual-hosted git repository.
vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 46738d00bdf3 feat(schema): Migrate hudi spark client to use
HoodieSchema (#17743)
46738d00bdf3 is described below
commit 46738d00bdf3c476e300247176cceb2900ddd871
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 30 21:13:56 2025 -0500
feat(schema): Migrate hudi spark client to use HoodieSchema (#17743)
* spark client changes so far
* feat(schema): Migrate hudi spark client to use HoodieSchema
* fix compilation issues
* fix tests
* address comments
* fix merge conflict
---
.../bootstrap/HoodieBootstrapSchemaProvider.java | 12 ++++----
.../HoodieSparkBootstrapSchemaProvider.java | 16 +++++------
.../MultipleSparkJobExecutionStrategy.java | 6 ++--
...SparkJobConsistentHashingExecutionStrategy.java | 13 ++++-----
...onsistentBucketClusteringExecutionStrategy.java | 3 +-
.../SparkSingleFileSortExecutionStrategy.java | 5 ++--
.../SparkSortAndSizeExecutionStrategy.java | 5 ++--
.../client/utils/SparkMetadataWriterUtils.java | 10 +++----
.../hudi/common/model/HoodieSparkRecord.java | 33 ++++++++++++----------
.../hudi/execution/SparkLazyInsertIterable.java | 15 +++++-----
.../bulkinsert/RDDBucketIndexPartitioner.java | 11 ++++----
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 2 +-
.../io/storage/HoodieSparkFileWriterFactory.java | 4 +--
.../hudi/io/storage/HoodieSparkParquetReader.java | 12 ++++----
.../apache/hudi/merge/SparkRecordMergingUtils.java | 4 +--
.../bootstrap/BaseBootstrapMetadataHandler.java | 11 ++++----
.../bootstrap/OrcBootstrapMetadataHandler.java | 12 ++++----
.../bootstrap/ParquetBootstrapMetadataHandler.java | 16 +++++------
.../hudi/BaseSparkInternalRecordContext.java | 13 ++++-----
.../hudi/BaseSparkInternalRowReaderContext.java | 2 +-
.../hudi/HoodieDatasetBulkInsertHelper.scala | 7 ++---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 5 ++--
.../SparkFileFormatInternalRecordContext.scala | 13 ++++-----
.../SparkFileFormatInternalRowReaderContext.scala | 4 +--
.../apache/spark/sql/HoodieInternalRowUtils.scala | 23 ++++++++-------
.../callback/TestHoodieClientInitCallback.java | 16 +++++------
.../hudi/client/TestUpdateSchemaEvolution.java | 5 ++--
.../functional/TestExternalPathHandling.java | 12 ++++----
.../functional/TestHoodieBackedTableMetadata.java | 7 ++---
.../hudi/keygen/KeyGeneratorTestUtilities.java | 8 +++---
.../SparkClientFunctionalTestHarness.java | 4 +--
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 4 +--
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 2 +-
.../hudi/functional/TestBufferedRecordMerger.java | 2 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 2 +-
.../functional/TestParquetColumnProjection.scala | 6 ++--
.../hudi/common/TestHoodieInternalRowUtils.scala | 8 +++---
38 files changed, 161 insertions(+), 174 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
index ce4c42ba1665..34848df31591 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
@@ -18,14 +18,12 @@
package org.apache.hudi.client.bootstrap;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.avro.Schema;
-
import java.util.List;
/**
@@ -45,11 +43,11 @@ public abstract class HoodieBootstrapSchemaProvider {
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
- public final Schema getBootstrapSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
+ public final HoodieSchema getBootstrapSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
- Schema userSchema = new Schema.Parser().parse(writeConfig.getSchema());
- if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
+ HoodieSchema userSchema = HoodieSchema.parse(writeConfig.getSchema());
+ if (!HoodieSchema.NULL_SCHEMA.equals(userSchema)) {
return userSchema;
}
}
@@ -63,7 +61,7 @@ public abstract class HoodieBootstrapSchemaProvider {
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
- protected abstract Schema getBootstrapSourceSchema(HoodieEngineContext
context,
+ protected abstract HoodieSchema getBootstrapSourceSchema(HoodieEngineContext
context,
List<Pair<String, List<HoodieFileStatus>>> partitions);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index b8aa55975a17..efdd945c2940 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -18,19 +18,19 @@
package org.apache.hudi.client.bootstrap;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.OrcFile;
@@ -52,8 +52,8 @@ public class HoodieSparkBootstrapSchemaProvider extends
HoodieBootstrapSchemaPro
}
@Override
- protected Schema getBootstrapSourceSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
- Schema schema = partitions.stream().flatMap(p ->
p.getValue().stream()).map(fs -> {
+ protected HoodieSchema getBootstrapSourceSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions) {
+ HoodieSchema schema = partitions.stream().flatMap(p ->
p.getValue().stream()).map(fs -> {
Path filePath = HadoopFSUtils.toPath(fs.getPath());
String extension = FSUtils.getFileExtension(filePath.getName());
if (PARQUET.getFileExtension().equals(extension)) {
@@ -69,7 +69,7 @@ public class HoodieSparkBootstrapSchemaProvider extends
HoodieBootstrapSchemaPro
return schema;
}
- private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig
writeConfig, HoodieEngineContext context, Path filePath) {
+ private static HoodieSchema
getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig,
HoodieEngineContext context, Path filePath) {
// NOTE: The type inference of partition column in the parquet table is
turned off explicitly,
// to be consistent with the existing bootstrap behavior, where the
partition column is String
// typed in Hudi table.
@@ -85,10 +85,10 @@ public class HoodieSparkBootstrapSchemaProvider extends
HoodieBootstrapSchemaPro
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;
- return AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema,
structName, recordNamespace);
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(parquetSchema,
structName, recordNamespace);
}
- private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig
writeConfig, HoodieEngineContext context, Path filePath) {
+ private static HoodieSchema getBootstrapSourceSchemaOrc(HoodieWriteConfig
writeConfig, HoodieEngineContext context, Path filePath) {
Reader orcReader = null;
try {
orcReader = OrcFile.createReader(filePath,
OrcFile.readerOptions(context.getStorageConf().unwrapAs(Configuration.class)));
@@ -99,7 +99,7 @@ public class HoodieSparkBootstrapSchemaProvider extends
HoodieBootstrapSchemaPro
String tableName =
HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;
- return AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema,
structName, recordNamespace, true);
+ return
HoodieSchema.fromAvroSchema(AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema,
structName, recordNamespace, true));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 2c409a50bf1b..f378ef46a5a3 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.SparkTaskContextSupplier;
@@ -65,7 +64,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
import
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -172,7 +170,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
final int numOutputGroups,
final String instantTime,
final Map<String, String> strategyParams,
-
final Schema schema,
+
final HoodieSchema schema,
final List<HoodieFileGroupId> fileGroupIdList,
final boolean shouldPreserveHoodieMetadata,
final Map<String, String> extraMetadata);
@@ -230,7 +228,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
return CompletableFuture.supplyAsync(() -> {
JavaSparkContext jsc =
HoodieSparkEngineContext.getSparkContext(getEngineContext());
HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc,
clusteringGroup, instantTime);
- Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(getWriteConfig().getSchema()));
+ HoodieSchema readerSchema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be shuffled later.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index 5dccf0637b9f..960107d06547 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client.clustering.run.strategy;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.LazyConcatenatingIterator;
@@ -32,6 +31,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -49,7 +49,6 @@ import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
import org.apache.hudi.util.ExecutorFactory;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import java.util.ArrayList;
import java.util.Collections;
@@ -71,13 +70,13 @@ import java.util.function.Supplier;
public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends
SingleSparkJobExecutionStrategy<T> {
private final String indexKeyFields;
- private final Schema readerSchema;
+ private final HoodieSchema readerSchema;
public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable table,
HoodieEngineContext engineContext,
HoodieWriteConfig
writeConfig) {
super(table, engineContext, writeConfig);
this.indexKeyFields = table.getConfig().getBucketIndexHashField();
- this.readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(writeConfig.getSchema()));
+ this.readerSchema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(writeConfig.getSchema()));
}
@Override
@@ -147,10 +146,10 @@ public class
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
private final boolean recordsSorted;
private final Map<String/*fileIdPrefix*/, HoodieWriteHandle> writeHandles;
private final Function<HoodieRecord, String> fileIdPrefixExtractor;
- private final Schema schema;
+ private final HoodieSchema schema;
public InsertHandler(HoodieWriteConfig config, String instantTime,
HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier,
- WriteHandleFactory writeHandleFactory, boolean
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, Schema
schema) {
+ WriteHandleFactory writeHandleFactory, boolean
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor,
HoodieSchema schema) {
this.config = config;
this.instantTime = instantTime;
this.hoodieTable = hoodieTable;
@@ -175,7 +174,7 @@ public class
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
handle = writeHandleFactory.create(config, instantTime, hoodieTable,
record.getPartitionPath(), fileIdPrefix, taskContextSupplier);
writeHandles.put(fileIdPrefix, handle);
}
- handle.write(record, HoodieSchema.fromAvroSchema(schema),
config.getProps());
+ handle.write(record, schema, config.getProps());
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 3b63e84bda07..379f2cb4b858 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -37,7 +37,6 @@ import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -84,7 +83,7 @@ public class
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
@Override
public HoodieData<WriteStatus>
performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int
numOutputGroups, String instantTime,
- Map<String,
String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList,
+ Map<String,
String> strategyParams, HoodieSchema schema, List<HoodieFileGroupId>
fileGroupIdList,
boolean
preserveHoodieMetadata, Map<String, String> extraMetadata) {
log.info("Starting clustering for a group, parallelism:{} commit:{}",
numOutputGroups, instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 6f201e5b4c70..ea778ce4fc90 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -35,7 +35,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -89,7 +88,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
int
numOutputGroups,
String
instantTime,
Map<String,
String> strategyParams,
- Schema schema,
+ HoodieSchema
schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean
shouldPreserveHoodieMetadata,
Map<String,
String> extraMetadata) {
@@ -105,7 +104,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(Long.MAX_VALUE));
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(), newConfig,
- false, getRDDPartitioner(strategyParams,
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
+ false, getRDDPartitioner(strategyParams, schema), true,
numOutputGroups,
new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(),
shouldPreserveHoodieMetadata));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index d66691d77fdd..05d9b23a8934 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -82,7 +81,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
final int
numOutputGroups,
final String
instantTime,
final
Map<String, String> strategyParams,
- final Schema
schema,
+ final
HoodieSchema schema,
final
List<HoodieFileGroupId> fileGroupIdList,
final boolean
shouldPreserveHoodieMetadata,
final
Map<String, String> extraMetadata) {
@@ -95,6 +94,6 @@ public class SparkSortAndSizeExecutionStrategy<T>
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE,
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
return (HoodieData<WriteStatus>)
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime,
getHoodieTable(),
- newConfig, false, getRDDPartitioner(strategyParams,
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new
CreateHandleFactory(shouldPreserveHoodieMetadata));
+ newConfig, false, getRDDPartitioner(strategyParams, schema), true,
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 9b1f36908b59..b86fa9612d1e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -72,7 +72,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.JavaScalaConverters;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
@@ -104,7 +103,6 @@ import java.util.stream.Stream;
import scala.collection.immutable.Seq;
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -351,7 +349,7 @@ public class SparkMetadataWriterUtils {
.build();
try {
ClosableIterator<InternalRow> rowsForFilePath =
fileGroupReader.getClosableIterator();
- SparkRowSerDe sparkRowSerDe =
HoodieSparkUtils.getCatalystRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema.toAvroSchema()));
+ SparkRowSerDe sparkRowSerDe =
HoodieSparkUtils.getCatalystRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema));
return getRowsWithExpressionIndexMetadata(rowsForFilePath,
sparkRowSerDe, partition, relativeFilePath, fileSize);
} catch (IOException ex) {
throw new HoodieIOException("Error reading file " + filePath, ex);
@@ -385,14 +383,14 @@ public class SparkMetadataWriterUtils {
HoodieIndexDefinition indexDefinition =
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition,
dataMetaClient);
List<String> columnsToIndex =
Collections.singletonList(indexDefinition.getSourceFields().get(0));
try {
- Option<Schema> writerSchema =
+ Option<HoodieSchema> writerSchema =
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
.flatMap(writerSchemaStr ->
isNullOrEmpty(writerSchemaStr)
? Option.empty()
- : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+ : Option.of(HoodieSchema.parse(writerSchemaStr)));
HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
- HoodieSchema tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? addMetadataFields(schema) :
schema).map(HoodieSchema::fromAvroSchema)
+ HoodieSchema tableSchema = writerSchema.map(schema ->
tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema)
: schema)
.orElseThrow(() -> new IllegalStateException(String.format("Expected
writer schema in commit metadata %s", commitMetadata)));
List<Pair<String, HoodieSchema>> columnsToIndexSchemaMap =
columnsToIndex.stream()
.map(columnToIndex -> HoodieSchemaUtils.getNestedField(tableSchema,
columnToIndex))
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 9606f255eef0..6747e1512a38 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRecordContext;
import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
@@ -171,7 +172,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
if (key != null) {
return getRecordKey();
}
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
return keyGeneratorOpt.isPresent()
? ((SparkKeyGeneratorInterface)
keyGeneratorOpt.get()).getRecordKey(data, structType).toString()
:
data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
@@ -182,7 +183,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
if (key != null) {
return getRecordKey();
}
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
DataType dataType = structType.apply(keyFieldName).dataType();
int pos = structType.fieldIndex(keyFieldName);
return data.get(pos, dataType).toString();
@@ -195,7 +196,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public Object[] getColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
Object[] objects = new Object[columns.length];
for (int i = 0; i < objects.length; i++) {
objects[i] = getValue(structType, columns[i], data);
@@ -205,12 +206,12 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public Object getColumnValueAsJava(Schema recordSchema, String column,
Properties props) {
- return getFieldValueFromInternalRowAsJava(data, recordSchema, column);
+ return getFieldValueFromInternalRowAsJava(data,
HoodieSchema.fromAvroSchema(recordSchema), column);
}
@Override
public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
- StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(targetSchema));
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
UnsafeProjection projection =
getCachedUnsafeProjection(targetStructType, targetStructType);
@@ -219,8 +220,8 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public HoodieRecord prependMetaFields(Schema recordSchema, Schema
targetSchema, MetadataValues metadataValues, Properties props) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
- StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+ StructType targetStructType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(targetSchema));
HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data,
structType);
updateMetadataValuesInternal(updatableRow, metadataValues);
@@ -230,7 +231,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public HoodieRecord updateMetaField(Schema recordSchema, int ordinal, String
value) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data,
structType);
updatableRow.update(ordinal,
CatalystTypeConverters.convertToCatalyst(value));
return new HoodieSparkRecord(getKey(), updatableRow, structType,
getOperation(), this.currentLocation, this.newLocation, false);
@@ -238,8 +239,8 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
- StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(newSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+ StructType newStructType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(newSchema));
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType,
newStructType, renameCols, Collections.emptyMap());
@@ -251,7 +252,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props,
String keyFieldName) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
int pos = structType.fieldIndex(keyFieldName);
data.update(pos,
CatalystTypeConverters.convertToCatalyst(StringUtils.EMPTY_STRING));
return this;
@@ -279,8 +280,10 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
Option<String> partitionNameOp,
Boolean populateMetaFields,
Option<Schema> schemaWithoutMetaFields) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
- Option<StructType> structTypeWithoutMetaFields =
schemaWithoutMetaFields.map(HoodieInternalRowUtils::getCachedSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+ Option<StructType> structTypeWithoutMetaFields = schemaWithoutMetaFields
+ .map(HoodieSchema::fromAvroSchema)
+ .map(HoodieInternalRowUtils::getCachedSchema);
if (populateMetaFields) {
return convertToHoodieSparkRecord(structType, this, withOperation);
} else if (simpleKeyGenFieldsOpt.isPresent()) {
@@ -292,7 +295,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema
recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
String key;
String partition;
boolean populateMetaFields =
Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(),
@@ -341,7 +344,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
if (orderingFields != null) {
return OrderingValues.create(orderingFields, field -> {
scala.Option<NestedFieldPath> cachedNestedFieldPath =
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 45e29a18110d..29370697ddc8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -18,11 +18,12 @@
package org.apache.hudi.execution;
-import org.apache.hudi.avro.AvroSchemaCache;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -30,8 +31,6 @@ import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.ExecutorFactory;
-import org.apache.avro.Schema;
-
import java.util.Iterator;
import java.util.List;
@@ -69,14 +68,14 @@ public class SparkLazyInsertIterable<T> extends
HoodieLazyInsertIterable<T> {
HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
try {
// config.getSchema is not canonicalized, while config.getWriteSchema is
canonicalized. So, we have to use the canonicalized schema to read the existing
data.
- Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema());
+ HoodieSchema schema = HoodieSchema.parse(hoodieConfig.getWriteSchema());
if (useWriterSchema) {
- schema = HoodieAvroUtils.addMetadataFields(schema);
+ schema = HoodieSchemaUtils.addMetadataFields(schema);
}
- schema = AvroSchemaCache.intern(schema);
+ schema = HoodieSchemaCache.intern(schema);
bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig,
inputItr, getInsertHandler(),
- getTransformer(schema, hoodieConfig),
hoodieTable.getPreExecuteRunnable());
+ getTransformer(schema.getAvroSchema(), hoodieConfig),
hoodieTable.getPreExecuteRunnable());
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
return result;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index fdbfae12ce32..f8f34d188c8b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -19,15 +19,14 @@
package org.apache.hudi.execution.bulkinsert;
import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.util.collection.FlatLists;
import org.apache.hudi.table.BucketIndexBulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -87,12 +86,12 @@ public abstract class RDDBucketIndexPartitioner<T> extends
BucketIndexBulkInsert
*/
private JavaRDD<HoodieRecord<T>>
doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, Partitioner
partitioner) {
final String[] sortColumns = sortColumnNames;
- final SerializableSchema schema = new
SerializableSchema(HoodieAvroUtils.addMetadataFields((new
Schema.Parser().parse(table.getConfig().getSchema()))));
+ final HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields((HoodieSchema.parse(table.getConfig().getSchema())));
Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> &
Serializable) (t1, t2) -> {
FlatLists.ComparableList obj1 =
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
- t1.getColumnValues(schema.get(), sortColumns,
consistentLogicalTimestampEnabled)));
+ t1.getColumnValues(schema.getAvroSchema(), sortColumns,
consistentLogicalTimestampEnabled)));
FlatLists.ComparableList obj2 =
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
- t2.getColumnValues(schema.get(), sortColumns,
consistentLogicalTimestampEnabled)));
+ t2.getColumnValues(schema.getAvroSchema(), sortColumns,
consistentLogicalTimestampEnabled)));
return obj1.compareTo(obj2);
};
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 327eb8919529..b85c5f52a0b0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -89,7 +89,7 @@ public class RDDSpatialCurveSortPartitioner<T>
return hoodieRecord;
});
} else if (recordType == HoodieRecordType.SPARK) {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
Dataset<Row> sourceDataset =
SparkConversionUtils.createDataFrame(records.rdd(),
sparkEngineContext.getSqlContext().sparkSession(), structType);
Dataset<Row> sortedDataset = reorder(sourceDataset,
outputSparkPartitions);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 4b5264ab759f..c251eae33e2e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -109,7 +109,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
protected HoodieFileWriter newLanceFileWriter(String instantTime,
StoragePath path, HoodieConfig config, HoodieSchema schema,
TaskContextSupplier
taskContextSupplier) throws IOException {
boolean populateMetaFields =
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
return new HoodieSparkLanceWriter(path, structType, instantTime,
taskContextSupplier, storage, populateMetaFields);
}
@@ -117,7 +117,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
private static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema
schema,
HoodieConfig config, boolean enableBloomFilter) {
Option<BloomFilter> filter = enableBloomFilter ?
Option.of(createBloomFilter(config)) : Option.empty();
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
return
HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf.unwrapAs(Configuration.class),
structType, filter, config);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index af1e7a8c45f0..553d4c79f6cf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.filter2.predicate.FilterApi;
@@ -46,7 +45,7 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.SchemaRepair;
import org.apache.spark.sql.HoodieInternalRowUtils;
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters;
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -82,7 +81,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
private final List<ClosableIterator> readerIterators = new ArrayList<>();
private Option<MessageType> fileSchemaOption = Option.empty();
private Option<StructType> structTypeOption = Option.empty();
- private Option<Schema> schemaOption = Option.empty();
+ private Option<HoodieSchema> schemaOption = Option.empty();
public HoodieSparkParquetReader(HoodieStorage storage, StoragePath path) {
this.path = path;
@@ -141,7 +140,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
*/
public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema
requestedSchema, List<Filter> readFilters) throws IOException {
HoodieSchema nonNullSchema =
HoodieSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
- StructType structSchema =
HoodieInternalRowUtils.getCachedSchema(nonNullSchema.toAvroSchema());
+ StructType structSchema =
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
Option<MessageType> messageSchema =
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema.toAvroSchema()));
boolean enableTimestampFieldRepair =
storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
StructType dataStructType = convertToStruct(enableTimestampFieldRepair ?
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) :
getFileSchema());
@@ -204,11 +203,10 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
// and therefore if we convert to Avro directly we'll lose logical
type-info.
MessageType messageType = getFileSchema();
StructType structType = getStructSchema();
- schemaOption = Option.of(HoodieSparkAvroSchemaConverters.toAvroType(
+ schemaOption = Option.of(HoodieSparkSchemaConverters.toHoodieType(
structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
}
- //TODO boundary to revisit using HoodieSchema directly
- return HoodieSchema.fromAvroSchema(schemaOption.get());
+ return schemaOption.get();
}
protected StructType getStructSchema() {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index 9dd1f5c71fc0..a838e1378d63 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -145,7 +145,7 @@ public class SparkRecordMergingUtils {
*/
public static Map<Integer, StructField>
getCachedFieldIdToFieldMapping(HoodieSchema providedSchema) {
return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(providedSchema,
schema -> {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
Map<Integer, StructField> schemaFieldIdMapping = new HashMap<>();
int fieldId = 0;
@@ -164,7 +164,7 @@ public class SparkRecordMergingUtils {
*/
public static Map<String, Integer>
getCachedFieldNameToIdMapping(HoodieSchema providedSchema) {
return FIELD_NAME_TO_ID_MAPPING_CACHE.computeIfAbsent(providedSchema,
schema -> {
- StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
Map<String, Integer> schemaFieldIdMapping = new HashMap<>();
int fieldId = 0;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
index a36111c83419..368c04aa89c9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
@@ -24,6 +24,8 @@ import org.apache.hudi.avro.model.HoodiePath;
import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
@@ -32,7 +34,6 @@ import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,11 +59,11 @@ public abstract class BaseBootstrapMetadataHandler
implements BootstrapMetadataH
HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new
HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath, FSUtils.createNewFileIdPfx(),
table.getTaskContextSupplier());
try {
- Schema avroSchema = getAvroSchema(sourceFilePath);
+ HoodieSchema schema = getSchema(sourceFilePath);
List<String> recordKeyColumns =
keyGenerator.getRecordKeyFieldNames().stream()
.map(HoodieAvroUtils::getRootLevelFieldName)
.collect(Collectors.toList());
- Schema recordKeySchema =
HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
+ HoodieSchema recordKeySchema =
HoodieSchemaUtils.generateProjectionSchema(schema, recordKeyColumns);
LOG.info("Schema to be used for reading record keys: " +
recordKeySchema);
@@ -79,8 +80,8 @@ public abstract class BaseBootstrapMetadataHandler implements
BootstrapMetadataH
return writeStatus;
}
- abstract Schema getAvroSchema(StoragePath sourceFilePath) throws IOException;
+ abstract HoodieSchema getSchema(StoragePath sourceFilePath) throws
IOException;
abstract void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?>
bootstrapHandle,
- StoragePath sourceFilePath,
KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema)
throws Exception;
+ StoragePath sourceFilePath,
KeyGeneratorInterface keyGenerator, String partitionPath, HoodieSchema schema)
throws Exception;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 1a02941c2daa..9e6475a090cb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,28 +58,29 @@ class OrcBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
}
@Override
- Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
+ HoodieSchema getSchema(StoragePath sourceFilePath) throws IOException {
Reader orcReader = OrcFile.createReader(
new Path(sourceFilePath.toUri()),
OrcFile.readerOptions((Configuration) table.getStorageConf().unwrap()));
TypeDescription orcSchema = orcReader.getSchema();
- return AvroOrcUtils.createAvroSchema(orcSchema);
+ Schema schema = AvroOrcUtils.createAvroSchema(orcSchema);
+ return HoodieSchema.fromAvroSchema(schema);
}
@Override
void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
StoragePath sourceFilePath, KeyGeneratorInterface
keyGenerator,
- String partitionPath, Schema avroSchema) throws
Exception {
+ String partitionPath, HoodieSchema schema) throws
Exception {
// TODO support spark orc reader
if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) {
throw new UnsupportedOperationException();
}
Reader orcReader = OrcFile.createReader(
new Path(sourceFilePath.toUri()),
OrcFile.readerOptions((Configuration) table.getStorageConf().unwrap()));
- TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+ TypeDescription orcSchema =
AvroOrcUtils.createOrcSchema(schema.getAvroSchema());
HoodieExecutor<Void> executor = null;
RecordReader reader = orcReader.rows(new Reader.Options((Configuration)
table.getStorageConf().unwrap()).schema(orcSchema));
try {
- executor = ExecutorFactory.create(config, new
OrcReaderIterator<GenericRecord>(reader, avroSchema, orcSchema),
+ executor = ExecutorFactory.create(config, new
OrcReaderIterator<GenericRecord>(reader, schema.getAvroSchema(), orcSchema),
new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA.toAvroSchema());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 5414305104fd..d0c75bd8ae8e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.bootstrap;
+import org.apache.avro.Schema;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -36,7 +37,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.util.ExecutorFactory;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
@@ -66,12 +66,13 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
}
@Override
- Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
+ HoodieSchema getSchema(StoragePath sourceFilePath) throws IOException {
ParquetMetadata readFooter = ParquetFileReader.readFooter(
(Configuration) table.getStorageConf().unwrap(), new
Path(sourceFilePath.toUri()),
ParquetMetadataConverter.NO_FILTER);
MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
- return getAvroSchemaConverter((Configuration)
table.getStorageConf().unwrap()).convert(parquetSchema);
+ Schema schema = getAvroSchemaConverter((Configuration)
table.getStorageConf().unwrap()).convert(parquetSchema);
+ return HoodieSchema.fromAvroSchema(schema);
}
@Override
@@ -79,7 +80,7 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
StoragePath sourceFilePath,
KeyGeneratorInterface keyGenerator,
String partitionPath,
- Schema schema) throws Exception {
+ HoodieSchema schema) throws Exception {
HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
HoodieFileReader reader =
getHoodieSparkIOFactory(table.getStorage()).getReaderFactory(recordType)
@@ -88,15 +89,14 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
HoodieExecutor<Void> executor = null;
try {
Function<HoodieRecord, HoodieRecord> transformer = record -> {
- String recordKey = record.getRecordKey(schema,
Option.of(keyGenerator));
+ String recordKey = record.getRecordKey(schema.getAvroSchema(),
Option.of(keyGenerator));
return createNewMetadataBootstrapRecord(recordKey, partitionPath,
recordType)
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying)
buffer we get a clean copy of
// it since these records will be inserted into the queue
later.
.copy();
};
- //TODO boundary to reivisit in later pr to use HoodieSchema directly
- ClosableIterator<HoodieRecord> recordIterator =
reader.getRecordIterator(HoodieSchema.fromAvroSchema(schema));
+ ClosableIterator<HoodieRecord> recordIterator =
reader.getRecordIterator(schema);
executor = ExecutorFactory.create(config, recordIterator,
new BootstrapRecordConsumer(bootstrapHandle), transformer,
table.getPreExecuteRunnable());
executor.execute();
@@ -124,7 +124,7 @@ class ParquetBootstrapMetadataHandler extends
BaseBootstrapMetadataHandler {
return new HoodieAvroIndexedRecord(hoodieKey, avroRecord);
case SPARK:
- StructType schema =
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA.toAvroSchema());
+ StructType schema =
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA);
UnsafeProjection unsafeProjection =
HoodieInternalRowUtils$.MODULE$.getCachedUnsafeProjection(schema, schema);
GenericInternalRow row = new
GenericInternalRow(METADATA_BOOTSTRAP_RECORD_SCHEMA.getFields().size());
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index 1af96a2bf335..6bd5420af014 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.DefaultJavaTypeConverter;
import org.apache.hudi.util.OrderingValueEngineTypeConverter;
-import org.apache.avro.Schema;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -65,15 +64,15 @@ public abstract class BaseSparkInternalRecordContext
extends RecordContext<Inter
super(new DefaultJavaTypeConverter());
}
- public static Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
+ public static Object getFieldValueFromInternalRow(InternalRow row,
HoodieSchema recordSchema, String fieldName) {
return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
false);
}
- public static Object getFieldValueFromInternalRowAsJava(InternalRow row,
Schema recordSchema, String fieldName) {
+ public static Object getFieldValueFromInternalRowAsJava(InternalRow row,
HoodieSchema recordSchema, String fieldName) {
return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName,
true);
}
- private static Object getFieldValueFromInternalRowInternal(InternalRow row,
Schema recordSchema, String fieldName, boolean convertToJavaType) {
+ private static Object getFieldValueFromInternalRowInternal(InternalRow row,
HoodieSchema recordSchema, String fieldName, boolean convertToJavaType) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
@@ -107,7 +106,7 @@ public abstract class BaseSparkInternalRecordContext
extends RecordContext<Inter
@Override
public Object getValue(InternalRow row, HoodieSchema schema, String
fieldName) {
- return getFieldValueFromInternalRow(row, schema.toAvroSchema(), fieldName);
+ return getFieldValueFromInternalRow(row, schema, fieldName);
}
@Override
@@ -128,7 +127,7 @@ public abstract class BaseSparkInternalRecordContext
extends RecordContext<Inter
HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord);
InternalRow row = bufferedRecord.getRecord();
- return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema()),
+ return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema),
false, bufferedRecord.getHoodieOperation(),
bufferedRecord.getOrderingValue(), bufferedRecord.isDelete());
}
@@ -223,7 +222,7 @@ public abstract class BaseSparkInternalRecordContext
extends RecordContext<Inter
@Override
public UnaryOperator<InternalRow> projectRecord(HoodieSchema from,
HoodieSchema to, Map<String, String> renamedColumns) {
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()),
getCachedSchema(to.toAvroSchema()), renamedColumns, Collections.emptyMap());
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from),
getCachedSchema(to), renamedColumns, Collections.emptyMap());
return row -> (InternalRow) unsafeRowWriter.apply(row);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a7b3dd9a7421..e4a34ec60acf 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -90,7 +90,7 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
Map<Integer, Object> partitionValuesByIndex =
partitionFieldAndValues.stream()
.collect(Collectors.toMap(pair ->
to.getField(pair.getKey()).orElseThrow(() -> new
IllegalArgumentException("Missing field: " + pair.getKey())).pos(),
Pair::getRight));
Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()),
getCachedSchema(to.toAvroSchema()), Collections.emptyMap(),
partitionValuesByIndex);
+ HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from),
getCachedSchema(to), Collections.emptyMap(), partitionValuesByIndex);
return row -> (InternalRow) unsafeRowWriter.apply(row);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 24a37582399d..0ee2abd847a5 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.engine.TaskContextSupplier
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils}
import org.apache.hudi.config.HoodieWriteConfig
@@ -36,13 +37,11 @@ import
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
import
org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper,
BulkInsertDataInternalWriterHelper,
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
-
-import org.apache.avro.Schema
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
-import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath,
getNestedInternalRowValue, NestedFieldPath}
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -157,7 +156,7 @@ object HoodieDatasetBulkInsertHelper
writeConfig: HoodieWriteConfig,
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
- val schema = AvroConversionUtils.alignFieldsNullability(dataset.schema,
new Schema.Parser().parse(writeConfig.getSchema))
+ val schema =
HoodieSchemaConversionUtils.alignFieldsNullability(dataset.schema,
HoodieSchema.parse(writeConfig.getSchema))
HoodieJavaRDD.of(
injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
val taskContextSupplier: TaskContextSupplier =
table.getTaskContextSupplier
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 153aca96a154..daf5a0a9417f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -29,7 +29,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.util.ExceptionWrappingIterator
-import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.spark.SPARK_VERSION
@@ -202,9 +201,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
if (recs.isEmpty) {
Iterator.empty
} else {
- val schema = new Schema.Parser().parse(serializedTargetSchema)
+ val schema = HoodieSchema.parse(serializedTargetSchema)
val transform: GenericRecord => Either[GenericRecord, String] = record
=> try {
- Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
+ Left(HoodieAvroUtils.rewriteRecordDeep(record, schema.toAvroSchema,
true))
} catch {
case _: Throwable =>
Right(HoodieAvroUtils.safeAvroToJsonString(record))
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
index eac6bf6d0239..2d8a7276e681 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
@@ -19,7 +19,6 @@
package org.apache.hudi
-import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.common.engine.RecordContext
@@ -36,8 +35,8 @@ import scala.collection.mutable
trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
- private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] =
mutable.Map()
- private val serializerMap: mutable.Map[Schema, HoodieAvroSerializer] =
mutable.Map()
+ private val deserializerMap: mutable.Map[HoodieSchema,
HoodieAvroDeserializer] = mutable.Map()
+ private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] =
mutable.Map()
override def supportsParquetRowIndex: Boolean = {
HoodieSparkUtils.gteqSpark3_5
@@ -50,17 +49,17 @@ trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContex
* @return An [[InternalRow]].
*/
override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
- val schema = avroRecord.getSchema
+ val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema)
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
val deserializer = deserializerMap.getOrElseUpdate(schema, {
- sparkAdapter.createAvroDeserializer(HoodieSchema.fromAvroSchema(schema),
structType)
+ sparkAdapter.createAvroDeserializer(schema, structType)
})
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
override def convertToAvroRecord(record: InternalRow, schema: HoodieSchema):
GenericRecord = {
- val structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
- val serializer = serializerMap.getOrElseUpdate(schema.toAvroSchema, {
+ val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+ val serializer = serializerMap.getOrElseUpdate(schema, {
sparkAdapter.createAvroSerializer(structType, schema, schema.isNullable)
})
serializer.serialize(record).asInstanceOf[GenericRecord]
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 96092fe22be8..5a0f796d377b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -80,7 +80,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader:
SparkColumnarFileR
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
- val structType =
HoodieInternalRowUtils.getCachedSchema(requiredSchema.toAvroSchema)
+ val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
// NOTE: now only primary key based filtering is supported for log files
@@ -290,4 +290,4 @@ object SparkFileFormatInternalRowReaderContext {
field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index e2b782a424a8..3b0320d6877d 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql
-import org.apache.avro.Schema
-import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.common.schema.HoodieSchema
+import
org.apache.hudi.HoodieSchemaConversionUtils.convertHoodieSchemaToStructType
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
@@ -59,13 +58,13 @@ object HoodieInternalRowUtils {
new mutable.HashMap[(StructType, StructType), UnsafeProjection]
})
- private val identicalUnsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[Schema, UnsafeProjection]] =
- ThreadLocal.withInitial(new Supplier[mutable.HashMap[Schema,
UnsafeProjection]] {
- override def get(): mutable.HashMap[Schema, UnsafeProjection] =
- new mutable.HashMap[Schema, UnsafeProjection]
+ private val identicalUnsafeProjectionThreadLocal:
ThreadLocal[mutable.HashMap[HoodieSchema, UnsafeProjection]] =
+ ThreadLocal.withInitial(new Supplier[mutable.HashMap[HoodieSchema,
UnsafeProjection]] {
+ override def get(): mutable.HashMap[HoodieSchema, UnsafeProjection] =
+ new mutable.HashMap[HoodieSchema, UnsafeProjection]
})
- private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+ private val schemaMap = new ConcurrentHashMap[HoodieSchema, StructType]
private val orderPosListMap = new ConcurrentHashMap[(StructType, String),
Option[NestedFieldPath]]
/**
@@ -85,7 +84,7 @@ object HoodieInternalRowUtils {
*/
def getCachedUnsafeProjection(schema: HoodieSchema): UnsafeProjection = {
identicalUnsafeProjectionThreadLocal.get()
- .getOrElseUpdate(schema.getAvroSchema,
UnsafeProjection.create(getCachedSchema(schema.getAvroSchema)))
+ .getOrElseUpdate(schema,
UnsafeProjection.create(getCachedSchema(schema)))
}
/**
@@ -127,16 +126,16 @@ object HoodieInternalRowUtils {
* @param schema [[Schema]] to convert to [[StructType]], NOTE: It is best
that the schema passed in is cached through
[[org.apache.hudi.avro.AvroSchemaCache]], so that we can reduce the overhead of
schema lookup in the map
* @return [[StructType]] for provided [[Schema]]
*/
- def getCachedSchema(schema: Schema): StructType = {
+ def getCachedSchema(schema: HoodieSchema): StructType = {
val structType = schemaMap.get(schema)
// NOTE: This specifically designed to do 2 lookups (in case of
cache-miss) to avoid
// allocating the closure when using [[computeIfAbsent]] on more
frequent cache-hit path
if (structType != null) {
structType
} else {
- schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
- override def apply(t: Schema): StructType =
- convertAvroSchemaToStructType(schema)
+ schemaMap.computeIfAbsent(schema, new JFunction[HoodieSchema,
StructType] {
+ override def apply(t: HoodieSchema): StructType =
+ convertHoodieSchemaToStructType(schema)
})
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
index 56a88a96861f..05814e446417 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -23,13 +23,13 @@ import
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback;
import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.avro.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -98,20 +98,20 @@ public class TestHoodieClientInitCallback {
WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
.build(false);
assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
- assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+ assertFalse(HoodieSchema.parse(config.getWriteSchema())
.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
try (SparkRDDWriteClient<Object> writeClient = new
SparkRDDWriteClient<>(engineContext, config)) {
HoodieWriteConfig updatedConfig = writeClient.getConfig();
assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
- Schema actualSchema = new
Schema.Parser().parse(updatedConfig.getWriteSchema());
+ HoodieSchema actualSchema =
HoodieSchema.parse(updatedConfig.getWriteSchema());
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
updatedConfig = writeClient.getTableServiceClient().getConfig();
assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
- actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+ actualSchema = HoodieSchema.parse(updatedConfig.getWriteSchema());
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
}
@@ -129,7 +129,7 @@ public class TestHoodieClientInitCallback {
WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
.build(false);
assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
- assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+ assertFalse(HoodieSchema.parse(config.getWriteSchema())
.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
try (SparkRDDWriteClient<Object> writeClient = new
SparkRDDWriteClient<>(engineContext, config)) {
@@ -137,14 +137,14 @@ public class TestHoodieClientInitCallback {
HoodieWriteConfig updatedConfig = writeClient.getConfig();
assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
assertEquals(CUSTOM_CONFIG_VALUE1,
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
- Schema actualSchema = new
Schema.Parser().parse(updatedConfig.getWriteSchema());
+ HoodieSchema actualSchema =
HoodieSchema.parse(updatedConfig.getWriteSchema());
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
updatedConfig = writeClient.getTableServiceClient().getConfig();
assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
assertEquals(CUSTOM_CONFIG_VALUE1,
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
- actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+ actualSchema = HoodieSchema.parse(updatedConfig.getWriteSchema());
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
assertEquals(CUSTOM_CONFIG_VALUE2,
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
}
@@ -217,7 +217,7 @@ public class TestHoodieClientInitCallback {
@Override
public void call(BaseHoodieClient hoodieClient) {
HoodieWriteConfig config = hoodieClient.getConfig();
- Schema schema = new Schema.Parser().parse(config.getWriteSchema());
+ HoodieSchema schema = HoodieSchema.parse(config.getWriteSchema());
if (!schema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2)) {
schema.addProp(CUSTOM_CONFIG_KEY2, CUSTOM_CONFIG_VALUE2);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 3593377ce1f8..e97d45fa88cc 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -41,7 +41,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
@@ -145,8 +144,8 @@ public class TestUpdateSchemaEvolution extends
HoodieSparkClientTestHarness impl
}
private List<HoodieRecord> buildUpdateRecords(String recordStr, String
insertFileId, String schema) throws IOException {
- Schema avroSchema = new Schema.Parser().parse(schema);
- GenericRecord data = new GenericData.Record(avroSchema);
+ HoodieSchema hoodieSchema = HoodieSchema.parse(schema);
+ GenericRecord data = new GenericData.Record(hoodieSchema.getAvroSchema());
Map<String, Object> json =
JsonUtils.getObjectMapper().readValue(recordStr, Map.class);
json.forEach(data::put);
String key = json.get("_row_key").toString();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index b348a1c33589..2eb39df74f01 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -34,6 +34,9 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -53,7 +56,6 @@ import org.apache.hudi.stats.ValueMetadata;
import org.apache.hudi.table.action.clean.CleanPlanner;
import org.apache.hudi.testutils.HoodieClientTestBase;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
@@ -88,10 +90,10 @@ public class TestExternalPathHandling extends
HoodieClientTestBase {
metaClient = HoodieTableMetaClient.reload(metaClient);
Properties properties = new Properties();
properties.setProperty(HoodieMetadataConfig.AUTO_INITIALIZE.key(),
"false");
- List<Schema.Field> fields = new ArrayList<>();
- fields.add(new Schema.Field(FIELD_1, Schema.create(Schema.Type.STRING),
null, null));
- fields.add(new Schema.Field(FIELD_2, Schema.create(Schema.Type.STRING),
null, null));
- Schema simpleSchema = Schema.createRecord("simpleSchema", null, null,
false, fields);
+ List<HoodieSchemaField> fields = new ArrayList<>();
+ fields.add(HoodieSchemaField.of(FIELD_1,
HoodieSchema.create(HoodieSchemaType.STRING)));
+ fields.add(HoodieSchemaField.of(FIELD_2,
HoodieSchema.create(HoodieSchemaType.STRING)));
+ HoodieSchema simpleSchema = HoodieSchema.createRecord("simpleSchema",
null, null, false, fields);
writeConfig = HoodieWriteConfig.newBuilder()
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 62b7a2495557..974e717a211f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -62,7 +62,6 @@ import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.params.ParameterizedTest;
@@ -520,15 +519,15 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile>
logFiles) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- Schema writerSchema =
- TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
+ HoodieSchema writerSchema =
+
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath()));
if (writerSchema == null) {
// not a data block
continue;
}
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
- new HoodieLogFile(pathInfoList.get(0).getPath()),
HoodieSchema.fromAvroSchema(writerSchema))) {
+ new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
index 29ee6af5a7a9..4be9fea94840 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
@@ -19,9 +19,9 @@
package org.apache.hudi.keygen;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.util.JavaScalaConverters;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
@@ -101,14 +101,14 @@ class KeyGeneratorTestUtilities {
}
public static GenericRecord getNestedColRecord(String prop1Value, Long
prop2Value) {
- GenericRecord nestedColRecord = new GenericData.Record(new
Schema.Parser().parse(NESTED_COL_SCHEMA));
+ GenericRecord nestedColRecord = new
GenericData.Record(HoodieSchema.parse(NESTED_COL_SCHEMA).getAvroSchema());
nestedColRecord.put("prop1", prop1Value);
nestedColRecord.put("prop2", prop2Value);
return nestedColRecord;
}
public static GenericRecord getRecord(GenericRecord nestedColRecord) {
- GenericRecord avroRecord = new GenericData.Record(new
Schema.Parser().parse(EXAMPLE_SCHEMA));
+ GenericRecord avroRecord = new
GenericData.Record(HoodieSchema.parse(EXAMPLE_SCHEMA).getAvroSchema());
avroRecord.put("timestamp", 4357686L);
avroRecord.put("_row_key", "key1");
avroRecord.put("ts_ms", "2020-03-21");
@@ -197,4 +197,4 @@ class KeyGeneratorTestUtilities {
}
return value;
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 9a67f723fdbe..ec2228be15c9 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.SerializableIndexedRecord;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -63,7 +64,6 @@ import
org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -424,7 +424,7 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
.build());
}
- protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
+ protected Dataset<Row> toDataset(List<HoodieRecord> records, HoodieSchema
schema) {
List<GenericRecord> avroRecords = records.stream()
.map(r -> (GenericRecord) ((SerializableIndexedRecord)
r.getData()).getData())
.collect(Collectors.toList());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index 25151cda0008..2798121af3d2 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -167,8 +167,8 @@ object HoodieCreateRecordUtils {
case HoodieRecord.HoodieRecordType.SPARK =>
val dataFileSchema =
HoodieSchemaCache.intern(HoodieSchema.parse(dataFileSchemaStr))
- val dataFileStructType =
HoodieInternalRowUtils.getCachedSchema(dataFileSchema.getAvroSchema)
- val writerStructType =
HoodieInternalRowUtils.getCachedSchema(AvroSchemaCache.intern(writerSchema.getAvroSchema))
+ val dataFileStructType =
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
+ val writerStructType =
HoodieInternalRowUtils.getCachedSchema(HoodieSchemaCache.intern(writerSchema))
val sourceStructType = df.schema
df.queryExecution.toRdd.mapPartitions { it =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a9fb6a287639..f17d641edb11 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -364,7 +364,7 @@ class HoodieSparkSqlWriterInternal {
// Avro's [[Schema]] we're preserving corresponding "record-name"
and "record-namespace" that
// play crucial role in establishing compatibility b/w schemas
val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s =>
- (s.getName, toScalaOption(s.getNamespace).orNull(null)))
+ (s.getName, toScalaOption(s.getNamespace).orNull))
.getOrElse(getRecordNameAndNamespace(tblName))
val sourceSchema = convertStructTypeToHoodieSchema(df.schema,
avroRecordName, avroRecordNamespace)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 8b95dbc27098..7585d9d93e65 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -561,7 +561,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
*/
private def convertBufferedRecordToJsonString(record:
BufferedRecord[InternalRow]): UTF8String = {
internalRowToJsonStringConverterMap.getOrElseUpdate(record.getSchemaId,
- new
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId).toAvroSchema)))
+ new
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId))))
.convert(record.getRecord)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 7bf3e2d3195f..a8b83671724b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -931,7 +931,7 @@ class TestBufferedRecordMerger extends
SparkClientFunctionalTestHarness {
@Override
public Object getValue(InternalRow record, HoodieSchema schema, String
fieldName) {
- return getFieldValueFromInternalRow(record, schema.toAvroSchema(),
fieldName);
+ return getFieldValueFromInternalRow(record, schema, fieldName);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index eeceb209f0fc..74c547d991b6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -399,7 +399,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
}
override def assertRecordMatchesSchema(schema: HoodieSchema, record:
InternalRow): Unit = {
- val structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
+ val structType = HoodieInternalRowUtils.getCachedSchema(schema)
assertRecordMatchesSchema(structType, record)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index e4e499a150c1..743a0734bfd4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -340,7 +340,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
else HoodieTestDataGenerator.HOODIE_SCHEMA
val records = dataGen.generateInserts("001", recordCount)
- val inputDF: Dataset[Row] = toDataset(records,
HoodieTestDataGenerator.HOODIE_SCHEMA.getAvroSchema)
+ val inputDF: Dataset[Row] = toDataset(records,
HoodieTestDataGenerator.HOODIE_SCHEMA)
inputDF.write.format("org.apache.hudi")
.options(opts)
@@ -372,7 +372,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
val updatedRecords = dataGen.generateUpdates("002",
recordsToUpdate.asJava)
// Step 2: Update M records out of those (t/h update)
- val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.AVRO_SCHEMA)
+ val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.HOODIE_SCHEMA)
inputDF.write.format("org.apache.hudi")
.options(opts)
@@ -402,7 +402,7 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
val updatedRecords = dataGen.generateUpdates("%03d".format(i),
recordsToUpdate.asJava)
// Step 2: Update M records out of those (t/h update)
- val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.AVRO_SCHEMA)
+ val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.HOODIE_SCHEMA)
val compactScheduleInline = if (inlineCompact) "false" else "true"
val compactInline = if (inlineCompact) "true" else "false"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
index 681e2fb6c4e2..b4ef5fe44a42 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
@@ -311,8 +311,8 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord,
newHoodieSchema.toAvroSchema, new HashMap[String, String])
assert(GenericData.get.validate(newHoodieSchema.toAvroSchema, newRecord))
// Convert avro to internalRow
- val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema)
- val newStructTypeSchema =
HoodieInternalRowUtils.getCachedSchema(newHoodieSchema.toAvroSchema)
+ val structTypeSchema =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(avroSchema))
+ val newStructTypeSchema =
HoodieInternalRowUtils.getCachedSchema(newHoodieSchema)
val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema,
structTypeSchema).apply(avroRecord).get
val newRowExpected =
AvroConversionUtils.createAvroToInternalRowConverter(newHoodieSchema.toAvroSchema,
newStructTypeSchema)
.apply(newRecord).get
@@ -367,8 +367,8 @@ class TestHoodieInternalRowUtils extends FunSuite with
Matchers with BeforeAndAf
// test the correctly of rewrite
assert(GenericData.get.validate(newAvroSchema, newAvroRecord))
// Convert avro to internalRow
- val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema)
- val newStructTypeSchema =
HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+ val structTypeSchema =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(schema))
+ val newStructTypeSchema =
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(newAvroSchema))
val row = AvroConversionUtils.createAvroToInternalRowConverter(schema,
structTypeSchema).apply(avroRecord).get
val newRowExpected =
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema,
newStructTypeSchema).apply(newAvroRecord).get