This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 8bf44c01b56 [HUDI-6821] Support multiple base file formats in Hudi table (#9761) 8bf44c01b56 is described below commit 8bf44c01b56dd3afe5323dc7566971cee2e46d50 Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Thu Oct 26 09:27:02 2023 +0530 [HUDI-6821] Support multiple base file formats in Hudi table (#9761) --- .../org/apache/hudi/config/HoodieWriteConfig.java | 11 +- .../java/org/apache/hudi/io/HoodieWriteHandle.java | 3 +- .../java/org/apache/hudi/table/HoodieTable.java | 10 +- .../table/action/bootstrap/BootstrapUtils.java | 9 +- ...sistentHashingBucketClusteringPlanStrategy.java | 4 +- .../rollback/ListingBasedRollbackStrategy.java | 6 +- .../table/upgrade/ZeroToOneUpgradeHandler.java | 7 +- .../io/storage/row/HoodieRowDataCreateHandle.java | 4 +- .../client/TestHoodieJavaWriteClientInsert.java | 4 +- .../hudi/client/TestJavaHoodieBackedMetadata.java | 5 - .../TestHoodieJavaClientOnCopyOnWriteStorage.java | 3 +- .../commit/TestJavaCopyOnWriteActionExecutor.java | 4 +- .../testutils/HoodieJavaClientTestHarness.java | 4 + .../SparkBootstrapCommitActionExecutor.java | 2 +- .../TestHoodieClientOnCopyOnWriteStorage.java | 14 +- .../table/action/bootstrap/TestBootstrapUtils.java | 12 +- .../commit/TestCopyOnWriteActionExecutor.java | 5 +- .../TestHoodieSparkMergeOnReadTableRollback.java | 2 +- .../hudi/testutils/HoodieClientTestBase.java | 5 + .../testutils/HoodieSparkClientTestHarness.java | 5 - .../apache/hudi/common/model/HoodieFileFormat.java | 9 + .../hudi/common/table/HoodieTableConfig.java | 10 + .../hudi/common/table/HoodieTableMetaClient.java | 19 +- .../org/apache/hudi/common/util/BaseFileUtils.java | 5 - .../org/apache/hudi/common/fs/TestFSUtils.java | 27 ++ .../hudi/common/testutils/HoodieTestTable.java | 3 +- .../org/apache/hudi/BaseFileOnlyRelation.scala | 4 +- .../main/scala/org/apache/hudi/DefaultSource.scala | 52 ++-- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 107 +++++++- ...tils.scala => HoodieSparkFileFormatUtils.scala} | 35 +-- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 9 +- .../hudi/MergeOnReadIncrementalRelation.scala | 4 +- .../apache/hudi/MergeOnReadSnapshotRelation.scala | 92 ------- .../sql/catalyst/catalog/HoodieCatalogTable.scala | 4 +- .../datasources/HoodieMultipleBaseFileFormat.scala | 278 +++++++++++++++++++++ .../spark/sql/hudi/ProvidesHoodieConfig.scala | 2 +- .../RepairMigratePartitionMetaProcedure.scala | 2 +- .../org/apache/hudi/functional/TestBootstrap.java | 8 +- .../apache/hudi/functional/TestOrcBootstrap.java | 8 +- .../apache/hudi/testutils/DataSourceTestUtils.java | 20 +- .../TestHoodieMultipleBaseFileFormat.scala | 123 +++++++++ .../datasources/Spark32NestedSchemaPruning.scala | 3 +- .../hudi/utilities/streamer/HoodieStreamer.java | 10 +- 43 files changed, 712 insertions(+), 241 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cc3876338cc..5ae7ab25fbd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -219,7 +219,7 @@ public class HoodieWriteConfig extends HoodieConfig { + "the timeline as an immutable log relying only on atomic writes for object storage."); public static final ConfigProperty<HoodieFileFormat> BASE_FILE_FORMAT = ConfigProperty - .key("hoodie.table.base.file.format") + .key("hoodie.base.file.format") .defaultValue(HoodieFileFormat.PARQUET) .withValidValues(HoodieFileFormat.PARQUET.name(), HoodieFileFormat.ORC.name(), HoodieFileFormat.HFILE.name()) .withAlternatives("hoodie.table.ro.file.format") @@ -1198,6 +1198,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getString(BASE_PATH); } + public HoodieFileFormat getBaseFileFormat() { + return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT)); + } + public HoodieRecordMerger getRecordMerger() { List<String> mergers = StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream() .map(String::trim) @@ -2705,6 +2709,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withBaseFileFormat(String baseFileFormat) { + writeConfig.setValue(BASE_FILE_FORMAT, HoodieFileFormat.valueOf(baseFileFormat).name()); + return this; + } + public Builder withSchema(String schemaStr) { writeConfig.setValue(AVRO_SCHEMA_STRING, schemaStr); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 8c76e322b09..9d1bb6d511e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -122,8 +122,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends HoodieIOHandle<T, I, throw new HoodieIOException("Failed to make dir " + path, e); } - return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, writeToken, fileId, - hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension())); + return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension())); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 8cc1dcf924c..36a5e6de21a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -217,7 +217,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, String instantTime, K keys); /** - * Delete records from Hoodie table based on {@link HoodieKey} and {@link HoodieRecordLocation} specified in + * Delete records from Hoodie table based on {@link HoodieKey} and {@link org.apache.hudi.common.model.HoodieRecordLocation} specified in * preppedRecords. * * @param context {@link HoodieEngineContext}. @@ -874,13 +874,13 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { } public HoodieFileFormat getBaseFileFormat() { + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + if (tableConfig.isMultipleBaseFileFormatsEnabled() && config.contains(HoodieWriteConfig.BASE_FILE_FORMAT)) { + return config.getBaseFileFormat(); + } return metaClient.getTableConfig().getBaseFileFormat(); } - public HoodieFileFormat getLogFileFormat() { - return metaClient.getTableConfig().getLogFileFormat(); - } - public Option<HoodieFileFormat> getPartitionMetafileFormat() { return metaClient.getTableConfig().getPartitionMetafileFormat(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java index 3e9e6b42a61..05f71454ed0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; @@ -45,16 +46,16 @@ public class BootstrapUtils { /** * Returns leaf folders with files under a path. - * @param metaClient Hoodie table metadata client + * @param baseFileFormat Hoodie base file format * @param fs File System * @param context JHoodieEngineContext * @return list of partition paths with files under them. * @throws IOException */ - public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(HoodieTableMetaClient metaClient, - FileSystem fs, String basePathStr, HoodieEngineContext context) throws IOException { + public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(HoodieFileFormat baseFileFormat, + FileSystem fs, String basePathStr, HoodieEngineContext context) throws IOException { final Path basePath = new Path(basePathStr); - final String baseFileExtension = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); + final String baseFileExtension = baseFileFormat.getFileExtension(); final Map<Integer, List<String>> levelToPartitions = new HashMap<>(); final Map<String, List<HoodieFileStatus>> partitionToFiles = new HashMap<>(); PathFilter filePathFilter = getFilePathFilter(baseFileExtension); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java index af3c00d3d8e..27fea59fa9f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java @@ -329,12 +329,12 @@ public abstract class BaseConsistentHashingBucketClusteringPlanStrategy<T extend } private long getSplitSize() { - HoodieFileFormat format = getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat(); + HoodieFileFormat format = getHoodieTable().getBaseFileFormat(); return (long) (getWriteConfig().getMaxFileSize(format) * getWriteConfig().getBucketSplitThreshold()); } private long getMergeSize() { - HoodieFileFormat format = getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat(); + HoodieFileFormat format = getHoodieTable().getBaseFileFormat(); return (long) (getWriteConfig().getMaxFileSize(format) * getWriteConfig().getBucketMergeThreshold()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java index 74e60b35bd0..2b383c1d246 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java @@ -95,7 +95,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing Rollback Plan: " + config.getTableName()); HoodieTableType tableType = table.getMetaClient().getTableType(); - String baseFileExtension = getBaseFileExtension(metaClient); + String baseFileExtension = table.getBaseFileExtension(); Option<HoodieCommitMetadata> commitMetadataOptional = getHoodieCommitMetadata(metaClient, instantToRollback); Boolean isCommitMetadataCompleted = checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional); AtomicBoolean isCompaction = new AtomicBoolean(false); @@ -191,10 +191,6 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu return metaClient.getFs().listStatus(FSUtils.getPartitionPath(config.getBasePath(), partitionPath), filter); } - private String getBaseFileExtension(HoodieTableMetaClient metaClient) { - return metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); - } - @NotNull private List<HoodieRollbackRequest> getHoodieRollbackRequests(String partitionPath, FileStatus[] filesToDeletedStatus) { return Arrays.stream(filesToDeletedStatus) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java index 831e11efae7..772afe71b02 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java @@ -18,8 +18,6 @@ package org.apache.hudi.table.upgrade; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.config.ConfigProperty; @@ -39,6 +37,9 @@ import org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy; import org.apache.hudi.table.marker.WriteMarkers; import org.apache.hudi.table.marker.WriteMarkersFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -137,6 +138,6 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler { String deltaInstant = FSUtils.getDeltaCommitTimeFromLogPath(logPath); String writeToken = FSUtils.getWriteTokenFromLogPath(logPath); - return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId, table.getBaseFileFormat().getFileExtension()); + return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId, table.getBaseFileExtension()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index 6cff94068d6..475d0efc582 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordDelegate; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -195,9 +194,8 @@ public class HoodieRowDataCreateHandle implements Serializable { } catch (IOException e) { throw new HoodieIOException("Failed to make dir " + path, e); } - HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, getWriteToken(), fileId, - tableConfig.getBaseFileFormat().getFileExtension())); + table.getBaseFileExtension())); } /** diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java index 02c407ba02d..ea13939ad2e 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java @@ -148,7 +148,7 @@ public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); // Get some records belong to the same partition (2021/09/11) String insertRecordStr1 = "{\"_row_key\":\"1\"," @@ -222,7 +222,7 @@ public class TestHoodieJavaWriteClientInsert extends HoodieJavaClientTestHarness HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); String partitionPath = "2021/09/11"; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath}); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 1e8f5149d37..06446ae9138 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -39,7 +39,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; @@ -2763,10 +2762,6 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); - // Metadata table is HFile format - assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, - "Metadata Table base file format should be HFile"); - // Metadata table has a fixed number of partitions // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory // in the .hoodie folder. diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java index 3330c5c7eed..a591134517f 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java @@ -62,7 +62,6 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.RawTripTestPayload; -import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.MarkerUtils; @@ -1021,7 +1020,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage extends HoodieJavaClientTe private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) { for (WriteStatus status : allStatus) { Path filePath = new Path(basePath, status.getStat().getPath()); - records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, filePath)); + records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf, filePath)); } Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords); assertEquals(records.size(), expectedKeys.size()); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index a3a233cb743..bda362931c7 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -129,7 +129,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestHarne HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); String partitionPath = "2016/01/31"; @@ -476,7 +476,7 @@ public class TestJavaCopyOnWriteActionExecutor extends HoodieJavaClientTestHarne HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); writeClient.startCommitWithTime(firstCommitTime); metaClient = HoodieTableMetaClient.reload(metaClient); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); String partitionPath = "2022/04/09"; diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 7a373f093c0..09687e73a89 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -1046,4 +1046,8 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest } return builder; } + + public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) { + return BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat()); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index 92bee7ab141..884a7a6ab44 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -266,7 +266,7 @@ public class SparkBootstrapCommitActionExecutor<T> */ private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions() throws IOException { List<Pair<String, List<HoodieFileStatus>>> folders = BootstrapUtils.getAllLeafFoldersWithFiles( - table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context); + table.getBaseFileFormat(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context); LOG.info("Fetching Bootstrap Schema !!"); HoodieBootstrapSchemaProvider sourceSchemaProvider = new HoodieSparkBootstrapSchemaProvider(config); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 8f13e0cea48..44105a41983 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1200,7 +1200,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -1313,7 +1313,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, false, mergeAllowDuplicateInserts); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); SparkRDDWriteClient client = getHoodieWriteClient(config); - BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient); + BaseFileUtils fileUtils = getFileUtilsInstance(metaClient); // Inserts => will write file1 String commitTime1 = "001"; @@ -1410,9 +1410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(1, statuses.size(), "Just 1 file needs to be added."); String file1 = statuses.get(0).getFileId(); - assertEquals(100, - BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())) - .size(), "file should contain 100 records"); + assertEquals(100, getFileUtilsInstance(metaClient).readRowKeys(hadoopConf, new Path(basePath, statuses.get(0).getStat().getPath())).size(), "file should contain 100 records"); // Delete 20 among 100 inserted testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar); @@ -2091,7 +2089,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, List<GenericRecord> records) { for (WriteStatus status : allStatus) { Path filePath = new Path(basePath, status.getStat().getPath()); - records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath)); + records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(), filePath)); } Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords); assertEquals(records.size(), expectedKeys.size()); @@ -2180,10 +2178,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); assertEquals(expectedRecords, - BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, newFile).size(), + getFileUtilsInstance(metaClient).readRowKeys(hadoopConf, newFile).size(), "file should contain 110 records"); - List<GenericRecord> records = BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile); + List<GenericRecord> records = getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf, newFile); for (GenericRecord record : records) { String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); assertTrue(keys.contains(recordKey), "key expected to be part of " + instantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java index 83a6caecd19..cda4fa38d40 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java @@ -67,18 +67,14 @@ public class TestBootstrapUtils extends HoodieClientTestBase { } }); - List<Pair<String, List<HoodieFileStatus>>> collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, + List<Pair<String, List<HoodieFileStatus>>> collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), basePath, context); assertEquals(3, collected.size()); - collected.stream().forEach(k -> { - assertEquals(2, k.getRight().size()); - }); + collected.forEach(k -> assertEquals(2, k.getRight().size())); // Simulate reading from un-partitioned dataset - collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath + "/" + folders.get(0), context); + collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), basePath + "/" + folders.get(0), context); assertEquals(1, collected.size()); - collected.stream().forEach(k -> { - assertEquals(2, k.getRight().size()); - }); + collected.forEach(k -> assertEquals(2, k.getRight().size())); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 24b66911613..4574b34393d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -244,8 +244,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implemen // Check whether the record has been updated Path updatedFilePath = allFiles[0].getPath(); - BloomFilter updatedFilter = - BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, updatedFilePath); + BloomFilter updatedFilter = getFileUtilsInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, updatedFilePath); for (HoodieRecord record : records) { // No change to the _row_key assertTrue(updatedFilter.mightContain(record.getRecordKey())); @@ -542,7 +541,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase implemen Option<Path> metafilePath = HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath); if (partitionMetafileUseBaseFormat) { // Extension should be the same as the data file format of the table - assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension())); + assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileExtension())); } else { // No extension as it is in properties file format assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 01cfcd047b4..92f2b2e1438 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -950,7 +950,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction return records; } - private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException { + private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient metaClient, HoodieWriteConfig cfg, long numLogFiles) { // Do a compaction String instantTime = client.scheduleCompaction(Option.empty()).get().toString(); HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = client.compact(instantTime); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index c4a150e7f8f..39c77de3f26 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; @@ -622,4 +623,8 @@ public class HoodieClientTestBase extends HoodieSparkClientTestHarness { public HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) { return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null); } + + public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient metaClient) { + return BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat()); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java index 54b4972880f..2a83baa018c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java @@ -31,7 +31,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCleaningPolicy; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -602,10 +601,6 @@ public abstract class HoodieSparkClientTestHarness extends HoodieWriterClientTes // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); - // Metadata table is HFile format - assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), HoodieFileFormat.HFILE, - "Metadata Table base file format should be HFile"); - // Metadata table has a fixed number of partitions // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that function filters all directory // in the .hoodie folder. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java index c8c94e5db3d..d7c25b82fad 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java @@ -64,4 +64,13 @@ public enum HoodieFileFormat { public String getFileExtension() { return extension; } + + public static HoodieFileFormat fromFileExtension(String extension) { + for (HoodieFileFormat format : HoodieFileFormat.values()) { + if (format.getFileExtension().equals(extension)) { + return format; + } + } + throw new IllegalArgumentException("Unknown file extension :" + extension); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 29869730367..27aaf3324ed 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -244,6 +244,12 @@ public class HoodieTableConfig extends HoodieConfig { .markAdvanced() .withDocumentation("When set to true, will not write the partition columns into hudi. By default, false."); + public static final ConfigProperty<Boolean> MULTIPLE_BASE_FILE_FORMATS_ENABLE = ConfigProperty + .key("hoodie.table.multiple.base.file.formats.enable") + .defaultValue(false) + .sinceVersion("1.0.0") + .withDocumentation("When set to true, the table can support reading and writing multiple base file formats."); + public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING; public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE; @@ -747,6 +753,10 @@ public class HoodieTableConfig extends HoodieConfig { return getBooleanOrDefault(DROP_PARTITION_COLUMNS); } + public boolean isMultipleBaseFileFormatsEnabled() { + return getBooleanOrDefault(MULTIPLE_BASE_FILE_FORMATS_ENABLE); + } + /** * Read the table checksum. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index cee950592b4..2a989764120 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -158,8 +158,7 @@ public class HoodieTableMetaClient implements Serializable { } this.timelineLayoutVersion = layoutVersion.orElseGet(() -> tableConfig.getTimelineLayoutVersion().get()); this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad; - LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ", baseFileFormat=" - + this.tableConfig.getBaseFileFormat() + ") from " + basePath); + LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath); if (loadActiveTimelineOnLoad) { LOG.info("Loading Active commit timeline for " + basePath); getActiveTimeline(); @@ -867,6 +866,7 @@ public class HoodieTableMetaClient implements Serializable { private String metadataPartitions; private String inflightMetadataPartitions; private String secondaryIndexesMetadata; + private Boolean multipleBaseFileFormatsEnabled; /** * Persist the configs that is written at the first time, and should not be changed. @@ -1031,6 +1031,15 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setMultipleBaseFileFormatsEnabled(Boolean multipleBaseFileFormatsEnabled) { + this.multipleBaseFileFormatsEnabled = multipleBaseFileFormatsEnabled; + return this; + } + + public PropertyBuilder setBaseFileFormats(String baseFileFormats) { + return this; + } + public PropertyBuilder set(Map<String, Object> props) { for (ConfigProperty<String> configProperty : HoodieTableConfig.PERSISTED_CONFIG_LIST) { if (containsConfigProperty(props, configProperty)) { @@ -1155,6 +1164,9 @@ public class HoodieTableMetaClient implements Serializable { if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES_METADATA)) { setSecondaryIndexesMetadata(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES_METADATA)); } + if (hoodieConfig.contains(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE)) { + setMultipleBaseFileFormatsEnabled(hoodieConfig.getBoolean(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE)); + } return this; } @@ -1263,6 +1275,9 @@ public class HoodieTableMetaClient implements Serializable { if (null != secondaryIndexesMetadata) { tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES_METADATA, secondaryIndexesMetadata); } + if (null != multipleBaseFileFormatsEnabled) { + tableConfig.setValue(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE, Boolean.toString(multipleBaseFileFormatsEnabled)); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index be41857a38e..278729f3d78 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; @@ -65,10 +64,6 @@ public abstract class BaseFileUtils { throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } - public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) { - return getInstance(metaClient.getTableConfig().getBaseFileFormat()); - } - /** * Read the rowKey list from the given data file. * diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index b5f4ea5726f..612929bc8a6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -545,6 +545,33 @@ public class TestFSUtils extends HoodieCommonTestHarness { .collect(Collectors.toSet())); } + @Test + public void testGetFileExtension() { + String pathWithExtension = "/path/to/some/file/sample.parquet"; + String pathWithoutExtension = "/path/to/some/file/sample"; + String justFileNameWithExtension = "sample.orc"; + String justFileNameWithoutExtension = "sample"; + + // file with extension + String result1 = FSUtils.getFileExtension(pathWithExtension); + assertEquals(".parquet", result1); + + // file without extension + String result2 = FSUtils.getFileExtension(pathWithoutExtension); + assertEquals("", result2); + + // just a file name with extension + String result3 = FSUtils.getFileExtension(justFileNameWithExtension); + assertEquals(".orc", result3); + + // just a file name without extension + String result4 = FSUtils.getFileExtension(justFileNameWithoutExtension); + assertEquals("", result4); + + // null input + assertThrows(NullPointerException.class, () -> FSUtils.getFileExtension(null)); + } + private Path getHoodieTempDir() { return new Path(baseUri.toString(), ".hoodie/.temp"); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 81e7d993d55..202827ce0c7 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -50,7 +50,6 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; @@ -789,7 +788,7 @@ public class HoodieTestTable { } public FileStatus[] listAllBaseFiles() throws IOException { - return listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension()); + return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension()); } public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala index f3b32b84017..65bb8881455 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala @@ -90,9 +90,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext, requiredSchema: HoodieTableSchema, requestedColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - + val (partitionSchema, dataSchema, requiredDataSchema) = tryPrunePartitionColumns(tableSchema, requiredSchema) val baseFileReader = createBaseFileReader( spark = sparkSession, partitionSchema = partitionSchema, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index f982fb1e1c3..965340c637a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -23,8 +23,7 @@ import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPER import org.apache.hudi.cdc.CDCRelation import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} -import org.apache.hudi.common.model.WriteConcurrencyMode -import org.apache.hudi.common.table.timeline.HoodieInstant +import org.apache.hudi.common.model.{HoodieTableType, WriteConcurrencyMode} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ConfigUtils import org.apache.hudi.common.util.ValidationUtils.checkState @@ -118,11 +117,6 @@ class DefaultSource extends RelationProvider DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, parameters) } - def getValidCommits(metaClient: HoodieTableMetaClient): String = { - metaClient - .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") - } - /** * This DataSource API is used for writing the DataFrame at the destination. For now, we are returning a dummy * relation here because Spark does not really make use of the relation returned, and just returns an empty @@ -227,6 +221,7 @@ object DefaultSource { val queryType = parameters(QUERY_TYPE.key) val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL && parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL) + val isMultipleBaseFileFormatsEnabled = metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType, queryType is: $queryType") @@ -245,16 +240,24 @@ object DefaultSource { } else if (isCdcQuery) { CDCRelation.getCDCRelation(sqlContext, metaClient, parameters) } else { - lazy val newHudiFileFormatUtils = if (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, - USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths == null || globPaths.isEmpty) + lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && !isBootstrappedTable) + || (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean + && (globPaths == null || globPaths.isEmpty) && parameters.getOrElse(REALTIME_MERGE.key(), REALTIME_MERGE.defaultValue()) - .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) { - val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext, metaClient, parameters, userSchema) + .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL))) { + val formatUtils = new HoodieSparkFileFormatUtils(sqlContext, metaClient, parameters, userSchema) if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils) } else { Option.empty } + if (isMultipleBaseFileFormatsEnabled) { + if (isBootstrappedTable) { + throw new HoodieException(s"Multiple base file formats are not supported for bootstrapped table") + } + resolveMultiFileFormatRelation(tableType, queryType, fileFormatUtils.get) + } + (tableType, queryType, isBootstrappedTable) match { case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | @@ -265,27 +268,27 @@ object DefaultSource { new IncrementalRelation(sqlContext, parameters, userSchema, metaClient) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => - if (newHudiFileFormatUtils.isEmpty) { + if (fileFormatUtils.isEmpty) { new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema) } else { - newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false) + fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = false) } case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema) case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) => - if (newHudiFileFormatUtils.isEmpty) { + if (fileFormatUtils.isEmpty) { new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters) } else { - newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = true) + fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap = true) } case (_, _, true) => - if (newHudiFileFormatUtils.isEmpty) { + if (fileFormatUtils.isEmpty) { resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters) } else { - newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap = true) + fileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap = true) } case (_, _, _) => @@ -332,6 +335,21 @@ object DefaultSource { } } + private def resolveMultiFileFormatRelation(tableType: HoodieTableType, + queryType: String, + fileFormatUtils: HoodieSparkFileFormatUtils): BaseRelation = { + (tableType, queryType) match { + case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL) | + (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) => + fileFormatUtils.getHadoopFsRelation(isMOR = false, isBootstrap = false) + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) | + (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) => + fileFormatUtils.getHadoopFsRelation(isMOR = true, isBootstrap = false) + case (_, _) => + throw new HoodieException(s"Multiple base file formats not supported for query type : $queryType for tableType: $tableType") + } + } + private def resolveSchema(metaClient: HoodieTableMetaClient, parameters: Map[String, String], schema: Option[StructType]): StructType = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 9ace93ed495..c791e8417ca 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -55,7 +55,6 @@ import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat @@ -516,6 +515,99 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ def updatePrunedDataSchema(prunedSchema: StructType): Relation + protected def createBaseFileReaders(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + requiredFilters: Seq[Filter], + optionalFilters: Seq[Filter] = Seq.empty, + baseFileFormat: HoodieFileFormat = tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = { + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + val fullSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = dataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt), + baseFileFormat = baseFileFormat + ) + + val requiredSchemaReader = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = requiredDataSchema, + // This file-reader is used to read base file records, subsequently merging them with the records + // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding + // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that + // we combine them correctly); + // As such only required filters could be pushed-down to such reader + filters = requiredFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema), + baseFileFormat = baseFileFormat + ) + + // Check whether fields required for merging were also requested to be fetched + // by the query: + // - In case they were, there's no optimization we could apply here (we will have + // to fetch such fields) + // - In case they were not, we will provide 2 separate file-readers + // a) One which would be applied to file-groups w/ delta-logs (merging) + // b) One which would be applied to file-groups w/ no delta-logs or + // in case query-mode is skipping merging + val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName) + if (mandatoryColumns.forall(requestedColumns.contains)) { + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReader + ) + } else { + val prunedRequiredSchema = { + val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) + val prunedStructSchema = + StructType(requiredDataSchema.structTypeSchema.fields + .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) + + HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString) + } + + val requiredSchemaReaderSkipMerging = createBaseFileReader( + spark = sqlContext.sparkSession, + partitionSchema = partitionSchema, + dataSchema = dataSchema, + requiredDataSchema = prunedRequiredSchema, + // This file-reader is only used in cases when no merging is performed, therefore it's safe to push + // down these filters to the base file readers + filters = requiredFilters ++ optionalFilters, + options = optParams, + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema), + baseFileFormat = baseFileFormat + ) + + HoodieMergeOnReadBaseFileReaders( + fullSchemaReader = fullSchemaReader, + requiredSchemaReader = requiredSchemaReader, + requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging + ) + } + } + /** * Returns file-reader routine accepting [[PartitionedFile]] and returning an [[Iterator]] * over [[InternalRow]] @@ -527,16 +619,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration, - shouldAppendPartitionValuesOverride: Option[Boolean] = None): BaseFileReader = { - val tableBaseFileFormat = tableConfig.getBaseFileFormat - + shouldAppendPartitionValuesOverride: Option[Boolean] = None, + baseFileFormat: HoodieFileFormat = tableConfig.getBaseFileFormat): BaseFileReader = { // NOTE: PLEASE READ CAREFULLY // Lambda returned from this method is going to be invoked on the executor, and therefore // we have to eagerly initialize all of the readers even though only one specific to the type // of the file being read will be used. This is required to avoid serialization of the whole // relation (containing file-index for ex) and passing it to the executor val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) = - tableBaseFileFormat match { + baseFileFormat match { case HoodieFileFormat.PARQUET => val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( sparkSession = spark, @@ -571,17 +662,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, (hfileReader, requiredDataSchema.structTypeSchema) - case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)") + case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($baseFileFormat)") } BaseFileReader( read = partitionedFile => { val filePathString = sparkAdapter.getSparkPartitionedFileUtils.getStringPathFromPartitionedFile(partitionedFile) val extension = FSUtils.getFileExtension(filePathString) - if (tableBaseFileFormat.getFileExtension.equals(extension)) { + if (baseFileFormat.getFileExtension.equals(extension)) { read(partitionedFile) } else { - throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)") + throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($baseFileFormat)") } }, schema = schema diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala similarity index 87% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala index a76d4bfc77f..e66b248e0ab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala @@ -24,9 +24,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation._ import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig} -import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ValidationUtils.checkState @@ -37,7 +35,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema} import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat, NewHoodieParquetFileFormat} -import org.apache.spark.sql.execution.datasources.{FileStatusCache, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -46,10 +44,10 @@ import org.apache.spark.sql.{SQLContext, SparkSession} import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} -class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext, - val metaClient: HoodieTableMetaClient, - val optParamsInput: Map[String, String], - private val schemaSpec: Option[StructType]) extends SparkAdapterSupport { +class HoodieSparkFileFormatUtils(val sqlContext: SQLContext, + val metaClient: HoodieTableMetaClient, + val optParamsInput: Map[String, String], + private val schemaSpec: Option[StructType]) extends SparkAdapterSupport { protected val sparkSession: SparkSession = sqlContext.sparkSession protected val optParams: Map[String, String] = optParamsInput.filter(kv => !kv._1.equals(DATA_QUERIES_ONLY.key())) @@ -208,18 +206,27 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext, Seq.empty } fileIndex.shouldEmbedFileSlices = true - val fileGroupReaderBasedFileFormat = new HoodieFileGroupReaderBasedParquetFileFormat( - tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt), - metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR, isBootstrap, shouldUseRecordPosition) - val newHoodieParquetFileFormat = new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState), - sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)), - metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR, isBootstrap) + + val fileFormat = if (fileGroupReaderEnabled) { + new HoodieFileGroupReaderBasedParquetFileFormat( + tableState, HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt), + metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR, isBootstrap, shouldUseRecordPosition) + } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) { + new HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState), + sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)), + metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR) + } else { + new NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState), + sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, tableAvroSchema.toString, internalSchemaOpt)), + metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, isMOR, isBootstrap) + } + HadoopFsRelation( location = fileIndex, partitionSchema = fileIndex.partitionSchema, dataSchema = fileIndex.dataSchema, bucketSpec = None, - fileFormat = if (fileGroupReaderEnabled) fileGroupReaderBasedFileFormat else newHoodieParquetFileFormat, + fileFormat = fileFormat, optParams)(sparkSession) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index b2c44cc3330..e2c5ad88d7f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -169,9 +169,12 @@ object HoodieWriterUtils { val resolver = spark.sessionState.conf.resolver val diffConfigs = StringBuilder.newBuilder params.foreach { case (key, value) => - val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) - if (null != existingValue && !resolver(existingValue, value)) { - diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + // Base file format can change between writes, so ignore it. + if (!HoodieTableConfig.BASE_FILE_FORMAT.key.equals(key)) { + val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key) + if (null != existingValue && !resolver(existingValue, value)) { + diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n") + } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index d80ce1a9cba..4dda08c2e28 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -17,7 +17,7 @@ package org.apache.hudi -import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} +import org.apache.hadoop.fs.{FileStatus, GlobPattern} import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling import org.apache.hudi.common.model.{FileSlice, HoodieRecord} @@ -181,7 +181,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava protected lazy val affectedFilesInCommits: Array[FileStatus] = { - listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) + listAffectedFilesForCommits(conf, metaClient.getBasePathV2, commitsMetadata) } protected lazy val (includeStartTime, startTs) = if (startInstantArchived) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 31d64d50e45..8808d73ae1a 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -18,12 +18,9 @@ package org.apache.hudi -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, isProjectionCompatible} -import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, OverwriteWithLatestAvroPayload} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.rdd.RDD @@ -125,95 +122,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, fileSplits = fileSplits) } - protected def createBaseFileReaders(tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema, - requestedColumns: Array[String], - requiredFilters: Seq[Filter], - optionalFilters: Seq[Filter] = Seq.empty): HoodieMergeOnReadBaseFileReaders = { - val (partitionSchema, dataSchema, requiredDataSchema) = - tryPrunePartitionColumns(tableSchema, requiredSchema) - - val fullSchemaReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredDataSchema = dataSchema, - // This file-reader is used to read base file records, subsequently merging them with the records - // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding - // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly); - // As such only required filters could be pushed-down to such reader - filters = requiredFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), internalSchemaOpt) - ) - - val requiredSchemaReader = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredDataSchema = requiredDataSchema, - // This file-reader is used to read base file records, subsequently merging them with the records - // stored in delta-log files. As such, we have to read _all_ records from the base file, while avoiding - // applying any filtering _before_ we complete combining them w/ delta-log records (to make sure that - // we combine them correctly); - // As such only required filters could be pushed-down to such reader - filters = requiredFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) - ) - - // Check whether fields required for merging were also requested to be fetched - // by the query: - // - In case they were, there's no optimization we could apply here (we will have - // to fetch such fields) - // - In case they were not, we will provide 2 separate file-readers - // a) One which would be applied to file-groups w/ delta-logs (merging) - // b) One which would be applied to file-groups w/ no delta-logs or - // in case query-mode is skipping merging - val mandatoryColumns = mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName) - if (mandatoryColumns.forall(requestedColumns.contains)) { - HoodieMergeOnReadBaseFileReaders( - fullSchemaReader = fullSchemaReader, - requiredSchemaReader = requiredSchemaReader, - requiredSchemaReaderSkipMerging = requiredSchemaReader - ) - } else { - val prunedRequiredSchema = { - val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains) - val prunedStructSchema = - StructType(requiredDataSchema.structTypeSchema.fields - .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) - - HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString) - } - - val requiredSchemaReaderSkipMerging = createBaseFileReader( - spark = sqlContext.sparkSession, - partitionSchema = partitionSchema, - dataSchema = dataSchema, - requiredDataSchema = prunedRequiredSchema, - // This file-reader is only used in cases when no merging is performed, therefore it's safe to push - // down these filters to the base file readers - filters = requiredFilters ++ optionalFilters, - options = optParams, - // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it - // to configure Parquet reader appropriately - hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema) - ) - - HoodieMergeOnReadBaseFileReaders( - fullSchemaReader = fullSchemaReader, - requiredSchemaReader = requiredSchemaReader, - requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging - ) - } - } - protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { val convertedPartitionFilters = HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilters) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 772dd27e279..01fa4f7e39b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -126,9 +126,9 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten lazy val partitionFields: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty) /** - * BaseFileFormat + * For multiple base file formats */ - lazy val baseFileFormat: String = metaClient.getTableConfig.getBaseFileFormat.name() + lazy val isMultipleBaseFileFormatsEnabled: Boolean = tableConfig.isMultipleBaseFileFormatsEnabled /** * Firstly try to load table schema from meta directory on filesystem. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala new file mode 100644 index 00000000000..c250a875f2b --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job +import org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, REALTIME_SKIP_MERGE_OPT_VAL} +import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.{FileSlice, HoodieLogFile} +import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema, HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation, PartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.util.SerializableConfiguration + +import scala.collection.mutable +import scala.jdk.CollectionConverters.asScalaIteratorConverter + +/** + * File format that supports reading multiple base file formats in a table. + */ +class HoodieMultipleBaseFileFormat(tableState: Broadcast[HoodieTableState], + tableSchema: Broadcast[HoodieTableSchema], + tableName: String, + mergeType: String, + mandatoryFields: Seq[String], + isMOR: Boolean) extends FileFormat with SparkAdapterSupport { + private val parquetFormat = new ParquetFileFormat() + private val orcFormat = new OrcFileFormat() + + override def inferSchema(sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + // This is a simple heuristic assuming all files have the same extension. + val fileFormat = detectFileFormat(files.head.getPath.toString) + + fileFormat match { + case "parquet" => parquetFormat.inferSchema(sparkSession, options, files) + case "orc" => orcFormat.inferSchema(sparkSession, options, files) + case _ => throw new UnsupportedOperationException(s"File format $fileFormat is not supported.") + } + } + + override def isSplitable(sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = { + false + } + + // Used so that the planner only projects once and does not stack overflow + var isProjected = false + + /** + * Support batch needs to remain consistent, even if one side of a bootstrap merge can support + * while the other side can't + */ + private var supportBatchCalled = false + private var supportBatchResult = false + + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (!supportBatchCalled) { + supportBatchCalled = true + supportBatchResult = + !isMOR && parquetFormat.supportBatch(sparkSession, schema) && orcFormat.supportBatch(sparkSession, schema) + } + supportBatchResult + } + + override def prepareWrite(sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + throw new UnsupportedOperationException("Write operations are not supported in this example.") + } + + override def buildReaderWithPartitionValues(sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + val requiredSchemaWithMandatory = if (!isMOR || MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) { + // add mandatory fields to required schema + val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]() + for (field <- mandatoryFields) { + if (requiredSchema.getFieldIndex(field).isEmpty) { + val fieldToAdd = dataSchema.fields(dataSchema.getFieldIndex(field).get) + added.append(fieldToAdd) + } + } + val addedFields = StructType(added.toArray) + StructType(requiredSchema.toArray ++ addedFields.fields) + } else { + dataSchema + } + + val (parquetBaseFileReader, orcBaseFileReader, preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) = buildFileReaders( + sparkSession, dataSchema, partitionSchema, requiredSchema, filters, options, hadoopConf, requiredSchemaWithMandatory) + + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + (file: PartitionedFile) => { + val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) + val fileFormat = detectFileFormat(filePath.toString) + file.partitionValues match { + case fileSliceMapping: PartitionFileSliceMapping => + if (FSUtils.isLogFile(filePath)) { + // no base file + val fileSlice = fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get + val logFiles = getLogFilesFromSlice(fileSlice) + val outputAvroSchema = HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName) + new LogFileIterator(logFiles, filePath.getParent, tableSchema.value, outputSchema, outputAvroSchema, + tableState.value, broadcastedHadoopConf.value.value) + } else { + // We do not broadcast the slice if it has no log files + fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) match { + case Some(fileSlice) => + val hoodieBaseFile = fileSlice.getBaseFile.get() + val baseFileFormat = detectFileFormat(hoodieBaseFile.getFileName) + val partitionValues = fileSliceMapping.getInternalRow + val logFiles = getLogFilesFromSlice(fileSlice) + if (requiredSchemaWithMandatory.isEmpty) { + val baseFile = createPartitionedFile(partitionValues, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + baseFileFormat match { + case "parquet" => parquetBaseFileReader(baseFile) + case "orc" => orcBaseFileReader(baseFile) + case _ => throw new UnsupportedOperationException(s"Base file format $baseFileFormat is not supported.") + } + } else { + if (logFiles.nonEmpty) { + val baseFile = createPartitionedFile(InternalRow.empty, hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen) + buildMergeOnReadIterator( + baseFileFormat match { + case "parquet" => preMergeParquetBaseFileReader(baseFile) + case "orc" => preMergeOrcBaseFileReader(baseFile) + case _ => throw new UnsupportedOperationException(s"Base file format $baseFileFormat is not supported.") + }, + logFiles, + filePath.getParent, + requiredSchemaWithMandatory, + requiredSchemaWithMandatory, + outputSchema, + partitionSchema, + partitionValues, + broadcastedHadoopConf.value.value) + } else { + throw new IllegalStateException("should not be here since file slice should not have been broadcasted since it has no log or base files") + } + } + case _ => fileFormat match { + case "parquet" => parquetBaseFileReader(file) + case "orc" => orcBaseFileReader(file) + case _ => throw new UnsupportedOperationException(s"Base file format $fileFormat is not supported.") + } + } + } + case _ => fileFormat match { + case "parquet" => parquetBaseFileReader(file) + case "orc" => orcBaseFileReader(file) + case _ => throw new UnsupportedOperationException(s"Base file format $fileFormat is not supported.") + } + } + } + } + + /** + * Build file readers to read individual physical files + */ + protected def buildFileReaders(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], + hadoopConf: Configuration, requiredSchemaWithMandatory: StructType): + (PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow], + PartitionedFile => Iterator[InternalRow]) = { + val parquetBaseFileReader = parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, + filters, options, new Configuration(hadoopConf)) + val orcBaseFileReader = orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, partitionSchema, requiredSchema, + filters, options, new Configuration(hadoopConf)) + + val preMergeParquetBaseFileReader = if (isMOR) { + parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), + requiredSchemaWithMandatory, Seq.empty, options, new Configuration(hadoopConf)) + } else { + _: PartitionedFile => Iterator.empty + } + + val preMergeOrcBaseFileReader = if (isMOR) { + orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, StructType(Seq.empty), + requiredSchemaWithMandatory, Seq.empty, options, new Configuration(hadoopConf)) + } else { + _: PartitionedFile => Iterator.empty + } + + (parquetBaseFileReader, orcBaseFileReader, preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) + } + + /** + * Create iterator for a file slice that has log files + */ + protected def buildMergeOnReadIterator(iter: Iterator[InternalRow], logFiles: List[HoodieLogFile], + partitionPath: Path, inputSchema: StructType, requiredSchemaWithMandatory: StructType, + outputSchema: StructType, partitionSchema: StructType, partitionValues: InternalRow, + hadoopConf: Configuration): Iterator[InternalRow] = { + + val requiredAvroSchema = HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName) + val morIterator = mergeType match { + case REALTIME_SKIP_MERGE_OPT_VAL => throw new UnsupportedOperationException("Skip merge is not currently " + + "implemented for the New Hudi Parquet File format") + //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema, tableSchema.value, + // requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, hadoopConf) + case REALTIME_PAYLOAD_COMBINE_OPT_VAL => + new RecordMergingFileIterator(logFiles, partitionPath, iter, inputSchema, tableSchema.value, + requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, hadoopConf) + } + appendPartitionAndProject(morIterator, requiredSchemaWithMandatory, partitionSchema, + outputSchema, partitionValues) + } + + /** + * Append partition values to rows and project to output schema + */ + protected def appendPartitionAndProject(iter: Iterator[InternalRow], + inputSchema: StructType, + partitionSchema: StructType, + to: StructType, + partitionValues: InternalRow): Iterator[InternalRow] = { + if (partitionSchema.isEmpty) { + projectSchema(iter, inputSchema, to) + } else { + val unsafeProjection = generateUnsafeProjection(StructType(inputSchema.fields ++ partitionSchema.fields), to) + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, partitionValues))) + } + } + + protected def projectSchema(iter: Iterator[InternalRow], + from: StructType, + to: StructType): Iterator[InternalRow] = { + val unsafeProjection = generateUnsafeProjection(from, to) + iter.map(d => unsafeProjection(d)) + } + + protected def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = { + fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList + } + + private def detectFileFormat(filePath: String): String = { + // Logic to detect file format based on the filePath or its content. + if (filePath.endsWith(".parquet")) "parquet" + else if (filePath.endsWith(".orc")) "orc" + else "" + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index a34a6dfb052..5492d12d5fb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -456,7 +456,7 @@ trait ProvidesHoodieConfig extends Logging { hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, enableHive.toString) hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, HiveSyncMode.HMS.name())) hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, hoodieCatalogTable.tableLocation) - hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, hoodieCatalogTable.baseFileFormat) + hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, props.getString(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key, HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue)) hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME, hoodieCatalogTable.table.identifier.database.getOrElse("default")) hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_TABLE_NAME, hoodieCatalogTable.table.identifier.table) if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index 5804d36ba09..b12c694ce56 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -74,7 +74,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu if (!dryRun) { if (!baseFormatFile.isPresent) { val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs, latestCommit, - basePath, partition, Option.of(metaClient.getTableConfig.getBaseFileFormat)) + basePath, partition, Option.of(getWriteConfig(basePath.toString).getBaseFileFormat)) partitionMetadata.trySave(0) } // delete it, in case we failed midway last time. diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index c2afc73ebac..8df34768909 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -169,7 +169,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { } else { df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath); } - String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(), metaClient.getFs(), srcPath, context).stream().findAny().map(p -> p.getValue().stream().findAny()) .orElse(null).get().getPath()).toString(); HoodieAvroParquetReader parquetReader = new HoodieAvroParquetReader(metaClient.getHadoopConf(), new Path(filePath)); @@ -273,7 +273,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { client.getTableServiceClient().rollbackFailedBootstrap(); metaClient.reloadActiveTimeline(); assertEquals(0, metaClient.getCommitsTimeline().countInstants()); - assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context) + assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(), metaClient.getFs(), basePath, context) .stream().mapToLong(f -> f.getValue().size()).sum()); BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); @@ -300,7 +300,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { String updateSPath = tmpFolder.toAbsolutePath() + "/data2"; generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath); JavaRDD<HoodieRecord> updateBatch = - generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context), + generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(), metaClient.getFs(), updateSPath, context), schema); String newInstantTs = client.startCommit(); client.upsert(updateBatch, newInstantTs); @@ -373,7 +373,7 @@ public class TestBootstrap extends HoodieSparkClientTestBase { bootstrapped.registerTempTable("bootstrapped"); original.registerTempTable("original"); if (checkNumRawFiles) { - List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(), metaClient.getFs(), bootstrapBasePath, context).stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList()); assertEquals(files.size() * numVersions, sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 54857e78eb7..abbbd78d064 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -151,7 +151,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { } else { df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath); } - String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + String filePath = FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), srcPath, context).stream().findAny().map(p -> p.getValue().stream().findAny()) .orElse(null).get().getPath()).toString(); Reader orcReader = OrcFile.createReader(new Path(filePath), OrcFile.readerOptions(metaClient.getHadoopConf())); @@ -262,7 +262,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { client.getTableServiceClient().rollbackFailedBootstrap(); metaClient.reloadActiveTimeline(); assertEquals(0, metaClient.getCommitsTimeline().countInstants()); - assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), basePath, context) + assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), basePath, context) .stream().flatMap(f -> f.getValue().stream()).count()); BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient); @@ -289,7 +289,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2"; generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath); JavaRDD<HoodieRecord> updateBatch = - generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), updateSPath, context), + generateInputBatch(jsc, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), updateSPath, context), schema); String newInstantTs = client.startCommit(); client.upsert(updateBatch, newInstantTs); @@ -361,7 +361,7 @@ public class TestOrcBootstrap extends HoodieSparkClientTestBase { bootstrapped.registerTempTable("bootstrapped"); original.registerTempTable("original"); if (checkNumRawFiles) { - List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), + List<HoodieFileStatus> files = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(), metaClient.getFs(), bootstrapBasePath, context).stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList()); assertEquals(files.size() * numVersions, sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index 4a93245dc8d..28c8df82e8e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -18,16 +18,17 @@ package org.apache.hudi.testutils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.FileIOUtils; import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -130,17 +131,14 @@ public class DataSourceTestUtils { */ public static boolean isLogFileOnly(String basePath) throws IOException { Configuration conf = new Configuration(); - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() - .setConf(conf).setBasePath(basePath) - .build(); - String baseDataFormat = metaClient.getTableConfig().getBaseFileFormat().getFileExtension(); Path path = new Path(basePath); FileSystem fs = path.getFileSystem(conf); RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true); while (files.hasNext()) { LocatedFileStatus file = files.next(); - if (file.isFile()) { - if (file.getPath().toString().endsWith(baseDataFormat)) { + // skip meta folder + if (file.isFile() && !file.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR)) { + if (FSUtils.isBaseFile(file.getPath())) { return false; } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala new file mode 100644 index 00000000000..8995f7ec883 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.config.HoodieStorageConfig +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType} +import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.testutils.HoodieTestDataGenerator.{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH} +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieSparkClientTestBase +import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, SparkDatasetMixin} +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +/** + * Test cases on multiple base file format support for COW and MOR table types. + */ +class TestHoodieMultipleBaseFileFormat extends HoodieSparkClientTestBase with SparkDatasetMixin { + + var spark: SparkSession = null + private val log = LoggerFactory.getLogger(classOf[TestMORDataSource]) + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE.key -> "true", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + val sparkOpts = Map( + HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName, + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet" + ) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + @BeforeEach override def setUp() { + setTableName("hoodie_test") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test + def testMultiFileFormatForCOWTableType(): Unit = { + insertAndValidateSnapshot(basePath, HoodieTableType.COPY_ON_WRITE.name()) + } + + @Test + def testMultiFileFormatForMORTableType(): Unit = { + insertAndValidateSnapshot(basePath, HoodieTableType.MERGE_ON_READ.name()) + } + + def insertAndValidateSnapshot(basePath: String, tableType: String): Unit = { + // Insert records in Parquet format to one of the partitions. + val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 10, DEFAULT_FIRST_PARTITION_PATH)).asScala + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .mode(SaveMode.Overwrite) + .save(basePath) + + // Insert records to a new partition in ORC format. + val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 10, DEFAULT_SECOND_PARTITION_PATH)).asScala + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .option(HoodieTableConfig.BASE_FILE_FORMAT.key, HoodieFileFormat.ORC.name()) + .mode(SaveMode.Append) + .save(basePath) + + // Snapshot Read the table + val hudiDf = spark.read.format("hudi").load(basePath + "/*") + assertEquals(0, hudiDf.count()) + + // Update and generate new slice across partitions. + val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 10)).asScala + val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .mode(SaveMode.Append) + .save(basePath) + + // Snapshot Read the table + val hudiDfAfterUpdate = spark.read.format("hudi").load(basePath + "/*") + assertEquals(0, hudiDfAfterUpdate.count()) + } +} diff --git a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala index 7a6cb20c849..861fd43be85 100644 --- a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala +++ b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport} import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils +import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, NamedExpression, ProjectionOverSchema} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -52,7 +53,7 @@ class Spark32NestedSchemaPruning extends Rule[LogicalPlan] { // NOTE: This is modified to accommodate for Hudi's custom relations, given that original // [[NestedSchemaPruning]] rule is tightly coupled w/ [[HadoopFsRelation]] // TODO generalize to any file-based relation - l @ LogicalRelation(relation: HoodieBaseRelation, _, _, _)) + l @ LogicalRelation(relation: HoodieBaseRelation, _, catalogTable: Option[HoodieCatalogTable], _)) if relation.canPruneRelationSchema => prunePhysicalColumns(l.output, projects, filters, relation.dataSchema, diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 0626ac3960f..95534c5533f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -671,13 +671,9 @@ public class HoodieStreamer implements Serializable { // This will guarantee there is no surprise with table type checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)), "Hoodie table is of type " + tableType + " but passed in CLI argument is " + cfg.tableType); - // Load base file format - // This will guarantee there is no surprise with base file type - String baseFileFormat = meta.getTableConfig().getBaseFileFormat().toString(); - checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || cfg.baseFileFormat == null, - format("Hoodie table's base file format is of type %s but passed in CLI argument is %s", baseFileFormat, cfg.baseFileFormat)); - cfg.baseFileFormat = baseFileFormat; - this.cfg.baseFileFormat = baseFileFormat; + if (cfg.baseFileFormat == null) { + cfg.baseFileFormat = "PARQUET"; // default for backward compatibility + } Map<String, String> propsToValidate = new HashMap<>(); properties.get().forEach((k, v) -> propsToValidate.put(k.toString(), v.toString())); HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());