This is an automated email from the ASF dual-hosted git repository. jonvex pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new e42217d2368 [HUDI-7350] Make Hudi reader and writer factory APIs Hadoop-independent (#11163) e42217d2368 is described below commit e42217d2368db493bc94930b511ec05c79fd9cdc Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Thu May 9 20:45:36 2024 -0400 [HUDI-7350] Make Hudi reader and writer factory APIs Hadoop-independent (#11163) Abstract io reader and writer to de-hadoop --------- Co-authored-by: Jonathan Vexler <=> --- .../hudi/client/timeline/LSMTimelineWriter.java | 2 +- .../hudi/avro/TestHoodieAvroParquetWriter.java | 4 +- .../hudi/testutils/HoodieWriteableTestTable.java | 10 ++-- .../row/HoodieRowDataFileWriterFactory.java | 3 +- .../io/storage/row/HoodieRowDataParquetWriter.java | 2 +- .../io/storage/HoodieSparkFileWriterFactory.java | 5 +- .../hudi/io/storage/HoodieSparkParquetWriter.java | 1 + .../row/HoodieInternalRowFileWriterFactory.java | 3 +- .../row/HoodieInternalRowParquetWriter.java | 2 +- .../io/storage/row/HoodieRowParquetConfig.java | 8 ++- .../storage/TestHoodieAvroFileWriterFactory.java | 3 + .../hudi/common/table/TableSchemaResolver.java | 6 +- .../table/log/block/HoodieParquetDataBlock.java | 48 ++++++---------- .../table/timeline/HoodieArchivedTimeline.java | 4 +- .../hudi/io/storage/HoodieAvroFileReader.java | 28 +++++++++- .../hudi/io/storage/HoodieAvroFileReaderBase.java | 49 ---------------- .../io/storage/HoodieAvroHFileReaderImplBase.java | 4 +- .../hudi/io/storage/HoodieFileReaderFactory.java | 11 +++- .../hudi/io/storage/HoodieFileWriterFactory.java | 21 ++++--- .../apache/hudi/io/storage/HoodieOrcConfig.java | 15 ++--- .../hudi/io/storage/HoodieParquetConfig.java | 15 ++--- .../testutils/reader/HoodieFileSliceTestUtils.java | 65 ++++++++++------------ .../testutils/reader/HoodieTestReaderContext.java | 9 ++- .../io/storage/TestHoodieReaderWriterUtils.java | 2 +- .../io/hadoop}/HoodieAvroFileReaderFactory.java | 20 ++++--- .../io/hadoop}/HoodieAvroFileWriterFactory.java | 45 ++++++++------- .../{storage => hadoop}/HoodieAvroHFileWriter.java | 17 +++--- .../hudi/io/hadoop}/HoodieAvroOrcReader.java | 19 ++++--- .../{storage => hadoop}/HoodieAvroOrcWriter.java | 19 ++++--- .../hudi/io/hadoop}/HoodieAvroParquetReader.java | 19 ++++--- .../HoodieAvroParquetWriter.java | 17 +++--- .../HoodieBaseParquetWriter.java | 23 ++++---- .../apache/hudi/io/hadoop}/HoodieHFileConfig.java | 16 +++--- .../hudi/io/hadoop}/HoodieParquetStreamWriter.java | 19 ++++--- .../parquet/io/OutputStreamBackedOutputFile.java | 0 .../TestHoodieAvroFileReaderFactory.java | 17 +++--- .../TestHoodieBaseParquetWriter.java | 23 ++++---- .../TestHoodieHBaseHFileReaderWriter.java | 19 ++++--- .../TestHoodieHFileReaderWriter.java | 18 +++--- .../TestHoodieHFileReaderWriterBase.java | 7 ++- .../TestHoodieOrcReaderWriter.java | 21 ++++--- .../TestHoodieReaderWriterBase.java | 7 ++- .../org/apache/spark/sql/hudi/SparkHelpers.scala | 7 ++- .../org/apache/hudi/functional/TestBootstrap.java | 2 +- .../row/TestHoodieInternalRowParquetWriter.java | 3 +- 45 files changed, 354 insertions(+), 304 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java index ccf2332699c..2ea57939427 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java @@ -38,7 +38,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.io.storage.HoodieFileWriterFactory; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java index 091d1d7195a..bff523f7f21 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -71,7 +71,7 @@ public class TestHoodieAvroParquetWriter { HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf.unwrap(), 0.1, true); + ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf, 0.1, true); StoragePath filePath = new StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java index a135a2b22e4..b45d80b1ee6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java @@ -39,18 +39,18 @@ import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.io.storage.HoodieAvroOrcWriter; -import org.apache.hudi.io.storage.HoodieAvroParquetWriter; +import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.orc.CompressionKind; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; @@ -124,7 +124,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { new AvroSchemaConverter().convert(schema), schema, Option.of(filter), new Properties()); HoodieParquetConfig<HoodieAvroWriteSupport> config = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 120 * 1024 * 1024, - new Configuration(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); + storage.getConf(), Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()), true); try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter( new StoragePath(Paths.get(basePath, partition, fileName).toString()), config, currentInstantTime, contextSupplier, populateMetaFields)) { @@ -142,7 +142,7 @@ public class HoodieWriteableTestTable extends HoodieMetadataTestTable { } } } else if (HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.ORC)) { - Configuration conf = new Configuration(); + StorageConfiguration conf = storage.getConf().newInstance(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 072bde04756..e9bc86b4a76 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.flink.table.types.logical.RowType; @@ -76,7 +77,7 @@ public class HoodieRowDataFileWriterFactory { writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), writeConfig.getParquetCompressionRatio(), writeConfig.parquetDictionaryEnabled())); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java index 8acd1ef9dd1..200662cc138 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage.row; -import org.apache.hudi.io.storage.HoodieBaseParquetWriter; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java index ee98ff322a3..ff17b48bf0c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java @@ -38,6 +38,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils; import org.apache.spark.sql.types.StructType; import java.io.IOException; +import java.io.OutputStream; public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { @@ -67,7 +68,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { boolean enableBloomFilter = false; HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); @@ -83,7 +84,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { writeSupport.getHadoopConf(), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf()); - return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig); + return new HoodieSparkParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java index 09f8d8dbe1c..ba4ab63006d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java @@ -21,6 +21,7 @@ package org.apache.hudi.io.storage; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.row.HoodieRowParquetConfig; import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index ffad5a895cb..8e7287a7024 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; @@ -79,7 +80,7 @@ public class HoodieInternalRowFileWriterFactory { writeConfig.getParquetBlockSize(), writeConfig.getParquetPageSize(), writeConfig.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), writeConfig.getParquetCompressionRatio(), writeConfig.parquetDictionaryEnabled() )); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index dcb1f197a04..f7ad33d2cbb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -18,7 +18,7 @@ package org.apache.hudi.io.storage.row; -import org.apache.hudi.io.storage.HoodieBaseParquetWriter; +import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java index f5f6d7b0a5b..f3b0f34b929 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java @@ -19,6 +19,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends HoodieParquetConfig<HoodieRowParquet public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean enableDictionary) { - super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, hadoopConf, compressionRatio, enableDictionary); + super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize, + new HadoopStorageConfiguration(hadoopConf), compressionRatio, enableDictionary); + } + + public Configuration getHadoopConf() { + return getStorageConf().unwrapAs(Configuration.class); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java index 4a13c77b629..74826c6f39b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java @@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter; +import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter; +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 6857513a2bb..f3b2bc69dc5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.hudi.io.storage.HoodieAvroOrcReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; @@ -356,8 +355,9 @@ public class TableSchemaResolver { private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) throws IOException { LOG.info("Reading schema from {}", orcFilePath); - - HoodieAvroOrcReader orcReader = new HoodieAvroOrcReader(metaClient.getRawHoodieStorage().getConf(), orcFilePath); + HoodieFileReader orcReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(metaClient.getTableConfig(), metaClient.getRawHoodieStorage().getConf(), orcFilePath, + HoodieFileFormat.ORC, Option.empty()); return convertAvroSchemaToParquet(orcReader.getSchema()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java index 4d7f3f838f2..6c2e6802769 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java @@ -36,7 +36,6 @@ import org.apache.hudi.storage.inline.InLineFSUtils; import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -107,38 +106,25 @@ public class HoodieParquetDataBlock extends HoodieDataBlock { } Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) { - HoodieFileWriter parquetWriter = null; - HoodieConfig config = new HoodieConfig(); - config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); - config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); - config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); - config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); - config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); - config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); - HoodieRecordType recordType = records.iterator().next().getRecordType(); - try { - parquetWriter = HoodieFileWriterFactory.getFileWriter( - HoodieFileFormat.PARQUET, - outputStream, - HoodieStorageUtils.getStorageConf(new Configuration()), - config, - writerSchema, - recordType); - for (HoodieRecord<?> record : records) { - String recordKey = getRecordKey(record).orElse(null); - parquetWriter.write(recordKey, record, writerSchema); - } - outputStream.flush(); - } finally { - if (parquetWriter != null) { - parquetWriter.close(); - } + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + HoodieConfig config = new HoodieConfig(); + config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get().name()); + config.setValue(PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + config.setValue(PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); + config.setValue(PARQUET_DICTIONARY_ENABLED, String.valueOf(useDictionaryEncoding.get())); + HoodieRecordType recordType = records.iterator().next().getRecordType(); + try (HoodieFileWriter parquetWriter = HoodieFileWriterFactory.getFileWriter( + HoodieFileFormat.PARQUET, outputStream, HoodieStorageUtils.getStorageConf(new Configuration()), + config, writerSchema, recordType)) { + for (HoodieRecord<?> record : records) { + String recordKey = getRecordKey(record).orElse(null); + parquetWriter.write(recordKey, record, writerSchema); } + outputStream.flush(); } - - return baos.toByteArray(); + return outputStream.toByteArray(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 58ec48ea630..42f8a6a2753 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieIOException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.StoragePath; @@ -266,7 +266,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { .filter(fileName -> filter == null || LSMTimeline.isFileInRange(filter, fileName)) .parallel().forEach(fileName -> { // Read the archived file - try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) + try (HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(), fileName))) { try (ClosableIterator<IndexedRecord> iterator = reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), readSchema)) { while (iterator.hasNext()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java index a829880d5f9..9b49fa871e2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java @@ -18,10 +18,32 @@ package org.apache.hudi.io.storage; +import org.apache.hudi.common.model.HoodieAvroIndexedRecord; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.CloseableMappingIterator; + +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import java.io.IOException; + +import static org.apache.hudi.common.util.TypeUtils.unsafeCast; + /** - * Marker interface for every {@link HoodieFileReader} reading in Avro (ie - * producing {@link IndexedRecord}s) + * Base class for every Avro file reader */ -public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> {} +public abstract class HoodieAvroFileReader implements HoodieFileReader<IndexedRecord> { + + @Override + public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { + ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema); + return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); + } + + protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException { + return getIndexedRecordIterator(readerSchema, readerSchema); + } + + public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; +} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java deleted file mode 100644 index b15ce11fd53..00000000000 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.io.storage; - -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.collection.ClosableIterator; -import org.apache.hudi.common.util.collection.CloseableMappingIterator; - -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; - -import java.io.IOException; - -import static org.apache.hudi.common.util.TypeUtils.unsafeCast; - -/** - * Base class for every {@link HoodieAvroFileReader} - */ -abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader { - - @Override - public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException { - ClosableIterator<IndexedRecord> iterator = getIndexedRecordIterator(readerSchema, requestedSchema); - return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); - } - - protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema) throws IOException { - return getIndexedRecordIterator(readerSchema, readerSchema); - } - - public abstract ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws IOException; -} diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java index 5e1a260e158..dd28d5f5589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java @@ -38,7 +38,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.util.CollectionUtils.toStream; import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes; -public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReaderBase +public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader implements HoodieSeekingFileReader<IndexedRecord> { // TODO HoodieHFileReader right now tightly coupled to MT, we should break that coupling public static final String SCHEMA_KEY = "schema"; @@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends HoodieAvroFileReader * <p> * Reads all the records with given schema */ - public static List<IndexedRecord> readAllRecords(HoodieAvroFileReaderBase reader) + public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader) throws IOException { Schema schema = reader.getSchema(); return toStream(reader.getIndexedRecordIterator(schema)) diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index fe075ccdc8f..c285f04a2b2 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -46,14 +46,21 @@ public class HoodieFileReaderFactory { public static HoodieFileReaderFactory getReaderFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileReaderFactory(); + + try { + Class<?> clazz = + ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory"); + return (HoodieFileReaderFactory) clazz.newInstance(); + } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileReaderFactory", e); + } case SPARK: try { Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory"); return (HoodieFileReaderFactory) clazz.newInstance(); } catch (IllegalArgumentException | IllegalAccessException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileReaderFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index d57dd55fcd5..69a8924f508 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -33,9 +33,9 @@ import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FSDataOutputStream; import java.io.IOException; +import java.io.OutputStream; import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; import static org.apache.hudi.common.model.HoodieFileFormat.ORC; @@ -46,13 +46,18 @@ public class HoodieFileWriterFactory { private static HoodieFileWriterFactory getWriterFactory(HoodieRecord.HoodieRecordType recordType) { switch (recordType) { case AVRO: - return new HoodieAvroFileWriterFactory(); + try { + Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory"); + return (HoodieFileWriterFactory) clazz.newInstance(); + } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { + throw new HoodieException("Unable to create HoodieAvroFileWriterFactory", e); + } case SPARK: try { Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory"); return (HoodieFileWriterFactory) clazz.newInstance(); } catch (IllegalAccessException | IllegalArgumentException | InstantiationException e) { - throw new HoodieException("Unable to create hoodie spark file writer factory", e); + throw new HoodieException("Unable to create HoodieSparkFileWriterFactory", e); } default: throw new UnsupportedOperationException(recordType + " record type not supported yet."); @@ -67,8 +72,8 @@ public class HoodieFileWriterFactory { return factory.getFileWriterByFormat(extension, instantTime, path, conf, config, schema, taskContextSupplier); } - public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) + public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration<?> conf, HoodieConfig config, Schema schema, HoodieRecordType recordType) throws IOException { HoodieFileWriterFactory factory = getWriterFactory(recordType); return factory.getFileWriterByFormat(format, outputStream, conf, config, schema); @@ -89,8 +94,8 @@ public class HoodieFileWriterFactory { throw new UnsupportedOperationException(extension + " format not supported yet."); } - protected <T, I, K, O> HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + protected <T, I, K, O> HoodieFileWriter getFileWriterByFormat(HoodieFileFormat format, OutputStream outputStream, + StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { switch (format) { case PARQUET: return newParquetFileWriter(outputStream, conf, config, schema); @@ -106,7 +111,7 @@ public class HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java index c45e02452e3..7cac57fa919 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java @@ -18,23 +18,24 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.orc.CompressionKind; public class HoodieOrcConfig { - static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; + public static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; private final CompressionKind compressionKind; private final int stripeSize; private final int blockSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration<?> storageConf; private final BloomFilter bloomFilter; - public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize, + public HoodieOrcConfig(StorageConfiguration<?> storageConf, CompressionKind compressionKind, int stripeSize, int blockSize, long maxFileSize, BloomFilter bloomFilter) { - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionKind = compressionKind; this.stripeSize = stripeSize; this.blockSize = blockSize; @@ -42,8 +43,8 @@ public class HoodieOrcConfig { this.bloomFilter = bloomFilter; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration<?> getStorageConf() { + return storageConf; } public CompressionKind getCompressionKind() { diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java index b5e567b7644..e17a017d679 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java @@ -18,7 +18,8 @@ package org.apache.hudi.io.storage; -import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.storage.StorageConfiguration; + import org.apache.parquet.hadoop.metadata.CompressionCodecName; /** @@ -31,18 +32,18 @@ public class HoodieParquetConfig<T> { private final int blockSize; private final int pageSize; private final long maxFileSize; - private final Configuration hadoopConf; + private final StorageConfiguration<?> storageConf; private final double compressionRatio; private final boolean dictionaryEnabled; - public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, - int pageSize, long maxFileSize, Configuration hadoopConf, double compressionRatio, boolean dictionaryEnabled) { + public HoodieParquetConfig(T writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize, + long maxFileSize, StorageConfiguration<?> storageConf, double compressionRatio, boolean dictionaryEnabled) { this.writeSupport = writeSupport; this.compressionCodecName = compressionCodecName; this.blockSize = blockSize; this.pageSize = pageSize; this.maxFileSize = maxFileSize; - this.hadoopConf = hadoopConf; + this.storageConf = storageConf; this.compressionRatio = compressionRatio; this.dictionaryEnabled = dictionaryEnabled; } @@ -63,8 +64,8 @@ public class HoodieParquetConfig<T> { return maxFileSize; } - public Configuration getHadoopConf() { - return hadoopConf; + public StorageConfiguration<?> getStorageConf() { + return storageConf; } public double getCompressionRatio() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java index 43002a723ef..01052d4b00f 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java @@ -19,14 +19,12 @@ package org.apache.hudi.common.testutils.reader; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieReaderConfig; +import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.LocalTaskContextSupplier; -import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -35,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock; @@ -44,12 +43,13 @@ import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.io.storage.HoodieAvroFileWriter; -import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -57,7 +57,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -78,7 +77,6 @@ import static org.apache.hudi.common.testutils.FileCreateUtils.logFileName; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE; import static org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT; -import static org.apache.hudi.io.storage.HoodieAvroFileWriterFactory.HOODIE_AVRO_PARQUET_WRITER; public class HoodieFileSliceTestUtils { public static final String FORWARD_SLASH = "/"; @@ -247,36 +245,31 @@ public class HoodieFileSliceTestUtils { Schema schema, String baseInstantTime ) throws IOException { - Configuration hadoopConf = new Configuration(); + StorageConfiguration<Configuration> conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); // TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214) - BloomFilter filter = BloomFilterFactory.createBloomFilter( - 1000, - 0.0001, - 10000, - BloomFilterTypeCode.DYNAMIC_V0.name()); - HoodieAvroWriteSupport<IndexedRecord> writeSupport = new HoodieAvroWriteSupport<>( - new AvroSchemaConverter().convert(schema), - schema, - Option.of(filter), - new Properties()); - HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig( - writeSupport, - CompressionCodecName.GZIP, - ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, - 1024 * 1024 * 1024, - hadoopConf, - 0.1, - true); - - try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER, - new Class<?>[] {StoragePath.class, HoodieParquetConfig.class, String.class, TaskContextSupplier.class, boolean.class}, - new StoragePath(baseFilePath), - parquetConfig, - baseInstantTime, - new LocalTaskContextSupplier(), - true)) { + HoodieConfig cfg = new HoodieConfig(); + //enable bloom filter + cfg.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true"); + cfg.setValue(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED.key(), "true"); + + //set bloom filter values + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), String.valueOf(1000)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key(), String.valueOf(0.00001)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), String.valueOf(10000)); + cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), BloomFilterTypeCode.DYNAMIC_V0.name()); + + //set parquet config values + cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), CompressionCodecName.GZIP.name()); + cfg.setValue(HoodieStorageConfig.PARQUET_BLOCK_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE)); + cfg.setValue(HoodieStorageConfig.PARQUET_PAGE_SIZE.key(), String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE)); + cfg.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 1024)); + cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(0.1)); + cfg.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.key(), "true"); + + try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) HoodieFileWriterFactory + .getFileWriter(baseInstantTime, new StoragePath(baseFilePath), conf, cfg, + schema, new LocalTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO)) { for (IndexedRecord record : records) { writer.writeAvro( (String) record.get(schema.getField(ROW_KEY).pos()), record); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java index b2d7306d6ab..6eb6733b04b 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java @@ -20,11 +20,13 @@ package org.apache.hudi.common.testutils.reader; import org.apache.hudi.avro.model.HoodieDeleteRecord; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.HoodieReaderContext; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; import org.apache.hudi.common.model.HoodieAvroRecordMerger; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.util.ConfigUtils; @@ -32,7 +34,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -78,7 +81,9 @@ public class HoodieTestReaderContext extends HoodieReaderContext<IndexedRecord> Schema requiredSchema, StorageConfiguration<?> conf ) throws IOException { - HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, new StoragePath(filePath.toUri())); + HoodieAvroFileReader reader = (HoodieAvroFileReader) HoodieFileReaderFactory + .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new HoodieConfig(), + conf, filePath, HoodieFileFormat.PARQUET, Option.empty()); return reader.getIndexedRecordIterator(dataSchema, requiredSchema); } diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java index a0ec0dfdb89..2fc38c156a3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java @@ -44,7 +44,7 @@ import static org.apache.hudi.io.hfile.TestHFileReader.DUMMY_BLOOM_FILTER; * Utils for reader and writer tests. */ public class TestHoodieReaderWriterUtils { - static void writeHFileForTesting(String fileLocation, + public static void writeHFileForTesting(String fileLocation, int blockSize, Compression.Algorithm compressionAlgo, int numEntries, diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java similarity index 81% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java index 6a6b0b67aa5..3a4d0b910ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java @@ -7,19 +7,25 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java similarity index 80% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java index 2a727158e17..d0b8faa7589 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -27,6 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriter; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -40,18 +46,19 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.io.OutputStream; import java.util.Properties; -import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1; -import static org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; -import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN; public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { //hardcoded classes to remove at a later time - public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.storage.HoodieAvroParquetWriter"; - public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.storage.HoodieAvroHFileWriter"; - public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.storage.HoodieAvroOrcWriter"; + public static final String HOODIE_AVRO_PARQUET_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroParquetWriter"; + public static final String HOODIE_AVRO_HFILE_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroHFileWriter"; + public static final String HOODIE_AVRO_ORC_WRITER = "org.apache.hudi.io.hadoop.HoodieAvroOrcWriter"; @Override protected HoodieFileWriter newParquetFileWriter( @@ -70,7 +77,7 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), - conf.unwrapAs(Configuration.class), config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); try { return (HoodieFileWriter) ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER, @@ -83,16 +90,16 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { } protected HoodieFileWriter newParquetFileWriter( - FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { + OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws IOException { HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, schema, config, false); HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.fromConf(config.getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.PARQUET_BLOCK_SIZE), config.getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE), config.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), // todo: 1024*1024*1024 - conf.unwrapAs(Configuration.class), config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), + conf, config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION), config.getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)); - return new HoodieParquetStreamWriter(outputStream, parquetConfig); + return new HoodieParquetStreamWriter(new FSDataOutputStream(outputStream, null), parquetConfig); } protected HoodieFileWriter newHFileFileWriter( @@ -120,7 +127,7 @@ public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory { String instantTime, StoragePath path, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { BloomFilter filter = createBloomFilter(config); - HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf.unwrapAs(Configuration.class), + HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf, CompressionKind.valueOf(config.getString(HoodieStorageConfig.ORC_COMPRESSION_CODEC_NAME)), config.getInt(HoodieStorageConfig.ORC_STRIPE_SIZE), config.getInt(HoodieStorageConfig.ORC_BLOCK_SIZE), diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java similarity index 93% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java index 6de6b24868b..379df6e97b9 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -26,6 +27,8 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieDuplicateKeyException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java similarity index 86% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java index 4d90590d953..c1f5b79c227 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieFileFormat; @@ -26,6 +27,8 @@ import org.apache.hudi.common.util.OrcReaderIterator; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -47,7 +50,7 @@ import java.util.Set; /** * {@link HoodieFileReader} implementation for ORC format. */ -public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase { +public class HoodieAvroOrcReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java similarity index 91% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java index 07e7bc7f122..40e37fa145f 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; @@ -27,6 +28,8 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.util.AvroOrcUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; @@ -70,7 +73,7 @@ public class HoodieAvroOrcWriter implements HoodieAvroFileWriter, Closeable { public HoodieAvroOrcWriter(String instantTime, StoragePath file, HoodieOrcConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { - Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getHadoopConf()); + Configuration conf = HadoopFSUtils.registerFileSystem(file, config.getStorageConf().unwrapAs(Configuration.class)); this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); this.instantTime = instantTime; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java similarity index 93% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java index 66ff6b483e0..d75660a9a7e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.bloom.BloomFilter; @@ -29,6 +30,8 @@ import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -52,7 +55,7 @@ import static org.apache.hudi.common.util.TypeUtils.unsafeCast; /** * {@link HoodieFileReader} implementation for parquet format. */ -public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase { +public class HoodieAvroParquetReader extends HoodieAvroFileReader { private final StoragePath path; private final StorageConfiguration<?> conf; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java similarity index 84% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java index 4269e6513a2..f8f9a8ccea0 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java @@ -7,20 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.avro.generic.IndexedRecord; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java similarity index 90% rename from hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java index 06f1e513055..8f17fa0fa1e 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java @@ -7,20 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; import org.apache.hadoop.conf.Configuration; @@ -52,8 +54,9 @@ public abstract class HoodieBaseParquetWriter<R> implements Closeable { public HoodieBaseParquetWriter(StoragePath file, HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException { + Configuration hadoopConf = parquetConfig.getStorageConf().unwrapAs(Configuration.class); ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder( - HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConf)) { @Override protected ParquetWriter.Builder self() { return this; @@ -73,8 +76,8 @@ public abstract class HoodieBaseParquetWriter<R> implements Closeable { parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled()); parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED); parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION); - parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); - handleParquetBloomFilters(parquetWriterbuilder, parquetConfig.getHadoopConf()); + parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, hadoopConf)); + handleParquetBloomFilters(parquetWriterbuilder, hadoopConf); parquetWriter = parquetWriterbuilder.build(); // We cannot accurately measure the snappy compressed output file size. We are choosing a diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java similarity index 87% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java index 64cc607ef63..83b659a6be0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java @@ -7,18 +7,20 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.io.storage.HoodieHBaseKVComparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java similarity index 84% rename from hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java index 226266bf6cf..5fdd6505733 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile; import org.apache.avro.generic.IndexedRecord; @@ -54,7 +57,7 @@ public class HoodieParquetStreamWriter implements HoodieAvroFileWriter, AutoClos .withDictionaryPageSize(parquetConfig.getPageSize()) .withDictionaryEncoding(parquetConfig.dictionaryEnabled()) .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION) - .withConf(parquetConfig.getHadoopConf()) + .withConf(parquetConfig.getStorageConf().unwrapAs(Configuration.class)) .build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java similarity index 100% rename from hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java rename to hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java similarity index 83% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java index 96b8ea9e6b3..7faf84a1ee5 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java @@ -7,19 +7,22 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java similarity index 86% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java index f9909b0f5f2..82a80b1ce26 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java @@ -7,28 +7,31 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.bloom.BloomFilterFactory; import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HoodieParquetConfig; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -83,7 +86,7 @@ public class TestHoodieBaseParquetWriter { public void testCanWrite() throws IOException { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 10000, BloomFilterTypeCode.DYNAMIC_V0.name()); - Configuration hadoopConf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), @@ -92,7 +95,7 @@ public class TestHoodieBaseParquetWriter { long maxFileSize = 2 * 1024 * 1024; HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, ParquetWriter.DEFAULT_BLOCK_SIZE, - ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true); + ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, conf, 0, true); StoragePath filePath = new StoragePath( new StoragePath(tempDir.toUri()), "test_fileSize.parquet"); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java similarity index 93% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java index 48fa1ddc501..8c227b88e0f 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java @@ -7,19 +7,24 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader; +import org.apache.hudi.io.storage.HoodieHFileUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java similarity index 85% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java index 6fe0e2ffea5..b87af2c8371 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java @@ -7,19 +7,23 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.StorageConfiguration; import org.apache.avro.Schema; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java similarity index 98% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java index dcd791956c5..3bdd561a282 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.config.HoodieStorageConfig; @@ -29,6 +29,9 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; @@ -75,7 +78,7 @@ import static org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUF import static org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX; import static org.apache.hudi.io.hfile.TestHFileReader.readHFileFromResources; -import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR; +import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java similarity index 87% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java index bc719be8bc8..6a94a32ed3c 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java @@ -7,16 +7,17 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; @@ -25,6 +26,10 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; +import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.io.storage.HoodieOrcConfig; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; @@ -57,7 +62,7 @@ public class TestHoodieOrcReaderWriter extends TestHoodieReaderWriterBase { protected HoodieAvroOrcWriter createWriter( Schema avroSchema, boolean populateMetaFields) throws Exception { BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); - Configuration conf = new Configuration(); + StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConfWithDefaults(); int orcStripSize = Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue()); int orcBlockSize = Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue()); int maxFileSize = Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue()); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java similarity index 97% rename from hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java rename to hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java index 226cf10f97e..3fd0ad80319 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.hudi.io.storage; +package org.apache.hudi.io.hadoop; import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.model.HoodieAvroIndexedRecord; @@ -26,6 +26,11 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.io.storage.HoodieAvroFileReader; +import org.apache.hudi.io.storage.HoodieAvroFileWriter; +import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala index 4d925d3d4ed..791435f4bb7 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala @@ -24,12 +24,13 @@ import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES, BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE} import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.util.{BaseFileUtils, Option} -import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, HoodieParquetConfig} +import org.apache.hudi.io.storage.HoodieParquetConfig import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath} import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem +import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter import org.apache.parquet.avro.AvroSchemaConverter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.sql.{DataFrame, SQLContext} @@ -61,12 +62,12 @@ object SparkHelpers { HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt, HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt, - conf.unwrap(), + conf, HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble, HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue) // Add current classLoad for config, if not will throw classNotFound of 'HoodieWrapperFileSystem'. - parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader) + conf.unwrap().setClassLoader(Thread.currentThread.getContextClassLoader) val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, instantTime, new SparkTaskContextSupplier(), true) for (rec <- sourceRecords) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index d39be52dd22..225cab39286 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -55,7 +55,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; import org.apache.hudi.index.HoodieIndex.IndexType; -import org.apache.hudi.io.storage.HoodieAvroParquetReader; +import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.storage.StoragePath; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index 65d140da8b3..95f151336c7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -89,7 +90,7 @@ public class TestHoodieInternalRowParquetWriter extends HoodieSparkClientTestHar HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new HoodieParquetConfig<>(writeSupport, CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(), - writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); + new HadoopStorageConfiguration(writeSupport.getHadoopConf()), cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled()); StoragePath filePath = new StoragePath(basePath + "/internal_row_writer.parquet");