This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e42217d2368 [HUDI-7350] Make Hudi reader and writer factory APIs 
Hadoop-independent (#11163)
e42217d2368 is described below

commit e42217d2368db493bc94930b511ec05c79fd9cdc
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Thu May 9 20:45:36 2024 -0400

    [HUDI-7350] Make Hudi reader and writer factory APIs Hadoop-independent 
(#11163)
    
    Abstract io reader and writer to de-hadoop
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/client/timeline/LSMTimelineWriter.java    |  2 +-
 .../hudi/avro/TestHoodieAvroParquetWriter.java     |  4 +-
 .../hudi/testutils/HoodieWriteableTestTable.java   | 10 ++--
 .../row/HoodieRowDataFileWriterFactory.java        |  3 +-
 .../io/storage/row/HoodieRowDataParquetWriter.java |  2 +-
 .../io/storage/HoodieSparkFileWriterFactory.java   |  5 +-
 .../hudi/io/storage/HoodieSparkParquetWriter.java  |  1 +
 .../row/HoodieInternalRowFileWriterFactory.java    |  3 +-
 .../row/HoodieInternalRowParquetWriter.java        |  2 +-
 .../io/storage/row/HoodieRowParquetConfig.java     |  8 ++-
 .../storage/TestHoodieAvroFileWriterFactory.java   |  3 +
 .../hudi/common/table/TableSchemaResolver.java     |  6 +-
 .../table/log/block/HoodieParquetDataBlock.java    | 48 ++++++----------
 .../table/timeline/HoodieArchivedTimeline.java     |  4 +-
 .../hudi/io/storage/HoodieAvroFileReader.java      | 28 +++++++++-
 .../hudi/io/storage/HoodieAvroFileReaderBase.java  | 49 ----------------
 .../io/storage/HoodieAvroHFileReaderImplBase.java  |  4 +-
 .../hudi/io/storage/HoodieFileReaderFactory.java   | 11 +++-
 .../hudi/io/storage/HoodieFileWriterFactory.java   | 21 ++++---
 .../apache/hudi/io/storage/HoodieOrcConfig.java    | 15 ++---
 .../hudi/io/storage/HoodieParquetConfig.java       | 15 ++---
 .../testutils/reader/HoodieFileSliceTestUtils.java | 65 ++++++++++------------
 .../testutils/reader/HoodieTestReaderContext.java  |  9 ++-
 .../io/storage/TestHoodieReaderWriterUtils.java    |  2 +-
 .../io/hadoop}/HoodieAvroFileReaderFactory.java    | 20 ++++---
 .../io/hadoop}/HoodieAvroFileWriterFactory.java    | 45 ++++++++-------
 .../{storage => hadoop}/HoodieAvroHFileWriter.java | 17 +++---
 .../hudi/io/hadoop}/HoodieAvroOrcReader.java       | 19 ++++---
 .../{storage => hadoop}/HoodieAvroOrcWriter.java   | 19 ++++---
 .../hudi/io/hadoop}/HoodieAvroParquetReader.java   | 19 ++++---
 .../HoodieAvroParquetWriter.java                   | 17 +++---
 .../HoodieBaseParquetWriter.java                   | 23 ++++----
 .../apache/hudi/io/hadoop}/HoodieHFileConfig.java  | 16 +++---
 .../hudi/io/hadoop}/HoodieParquetStreamWriter.java | 19 ++++---
 .../parquet/io/OutputStreamBackedOutputFile.java   |  0
 .../TestHoodieAvroFileReaderFactory.java           | 17 +++---
 .../TestHoodieBaseParquetWriter.java               | 23 ++++----
 .../TestHoodieHBaseHFileReaderWriter.java          | 19 ++++---
 .../TestHoodieHFileReaderWriter.java               | 18 +++---
 .../TestHoodieHFileReaderWriterBase.java           |  7 ++-
 .../TestHoodieOrcReaderWriter.java                 | 21 ++++---
 .../TestHoodieReaderWriterBase.java                |  7 ++-
 .../org/apache/spark/sql/hudi/SparkHelpers.scala   |  7 ++-
 .../org/apache/hudi/functional/TestBootstrap.java  |  2 +-
 .../row/TestHoodieInternalRowParquetWriter.java    |  3 +-
 45 files changed, 354 insertions(+), 304 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
index ccf2332699c..2ea57939427 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/LSMTimelineWriter.java
@@ -38,7 +38,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
index 091d1d7195a..bff523f7f21 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroParquetWriter.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetUtils;
-import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
@@ -71,7 +71,7 @@ public class TestHoodieAvroParquetWriter {
 
     HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
         new HoodieParquetConfig(writeSupport, CompressionCodecName.GZIP, 
ParquetWriter.DEFAULT_BLOCK_SIZE,
-            ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, 
storageConf.unwrap(), 0.1, true);
+            ParquetWriter.DEFAULT_PAGE_SIZE, 1024 * 1024 * 1024, storageConf, 
0.1, true);
 
     StoragePath filePath = new 
StoragePath(tmpDir.resolve("test.parquet").toAbsolutePath().toString());
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index a135a2b22e4..b45d80b1ee6 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -39,18 +39,18 @@ import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.io.storage.HoodieAvroOrcWriter;
-import org.apache.hudi.io.storage.HoodieAvroParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
 import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.orc.CompressionKind;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
@@ -124,7 +124,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
           new AvroSchemaConverter().convert(schema), schema, 
Option.of(filter), new Properties());
       HoodieParquetConfig<HoodieAvroWriteSupport> config = new 
HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP,
           ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 
120 * 1024 * 1024,
-          new Configuration(), 
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
 true);
+          storage.getConf(), 
Double.parseDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()),
 true);
       try (HoodieAvroParquetWriter writer = new HoodieAvroParquetWriter(
           new StoragePath(Paths.get(basePath, partition, 
fileName).toString()), config, currentInstantTime,
           contextSupplier, populateMetaFields)) {
@@ -142,7 +142,7 @@ public class HoodieWriteableTestTable extends 
HoodieMetadataTestTable {
         }
       }
     } else if 
(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().equals(HoodieFileFormat.ORC))
 {
-      Configuration conf = new Configuration();
+      StorageConfiguration conf = storage.getConf().newInstance();
       int orcStripSize = 
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
       int orcBlockSize = 
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
       int maxFileSize = 
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index 072bde04756..e9bc86b4a76 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.flink.table.types.logical.RowType;
@@ -76,7 +77,7 @@ public class HoodieRowDataFileWriterFactory {
         writeConfig.getParquetBlockSize(),
         writeConfig.getParquetPageSize(),
         writeConfig.getParquetMaxFileSize(),
-        writeSupport.getHadoopConf(),
+        new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
         writeConfig.getParquetCompressionRatio(),
         writeConfig.parquetDictionaryEnabled()));
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
index 8acd1ef9dd1..200662cc138 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index ee98ff322a3..ff17b48bf0c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -38,6 +38,7 @@ import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory {
 
@@ -67,7 +68,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   }
 
   protected HoodieFileWriter newParquetFileWriter(
-      FSDataOutputStream outputStream, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema) throws IOException {
+      OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig 
config, Schema schema) throws IOException {
     boolean enableBloomFilter = false;
     HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter);
     String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
@@ -83,7 +84,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
         writeSupport.getHadoopConf(), 
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
         
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
     parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf());
-    return new HoodieSparkParquetStreamWriter(outputStream, parquetConfig);
+    return new HoodieSparkParquetStreamWriter(new 
FSDataOutputStream(outputStream, null), parquetConfig);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
index 09f8d8dbe1c..ba4ab63006d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetWriter.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
 import org.apache.hudi.io.storage.row.HoodieRowParquetConfig;
 import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport;
 import org.apache.hudi.storage.StoragePath;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index ffad5a895cb..8e7287a7024 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 
 import org.apache.hadoop.conf.Configuration;
@@ -79,7 +80,7 @@ public class HoodieInternalRowFileWriterFactory {
             writeConfig.getParquetBlockSize(),
             writeConfig.getParquetPageSize(),
             writeConfig.getParquetMaxFileSize(),
-            writeSupport.getHadoopConf(),
+            new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
             writeConfig.getParquetCompressionRatio(),
             writeConfig.parquetDictionaryEnabled()
         ));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
index dcb1f197a04..f7ad33d2cbb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.io.storage.row;
 
-import org.apache.hudi.io.storage.HoodieBaseParquetWriter;
+import org.apache.hudi.io.hadoop.HoodieBaseParquetWriter;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
index f5f6d7b0a5b..f3b0f34b929 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetConfig.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.io.storage.row;
 
 import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -31,6 +32,11 @@ public class HoodieRowParquetConfig extends 
HoodieParquetConfig<HoodieRowParquet
   public HoodieRowParquetConfig(HoodieRowParquetWriteSupport writeSupport, 
CompressionCodecName compressionCodecName,
       int blockSize, int pageSize, long maxFileSize, Configuration hadoopConf,
       double compressionRatio, boolean enableDictionary) {
-    super(writeSupport, compressionCodecName, blockSize, pageSize, 
maxFileSize, hadoopConf, compressionRatio, enableDictionary);
+    super(writeSupport, compressionCodecName, blockSize, pageSize, maxFileSize,
+        new HadoopStorageConfiguration(hadoopConf), compressionRatio, 
enableDictionary);
+  }
+
+  public Configuration getHadoopConf() {
+    return getStorageConf().unwrapAs(Configuration.class);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
index 4a13c77b629..74826c6f39b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
@@ -24,6 +24,9 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.io.hadoop.HoodieAvroHFileWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroOrcWriter;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 6857513a2bb..f3b2bc69dc5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -43,7 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.storage.HoodieAvroOrcReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
@@ -356,8 +355,9 @@ public class TableSchemaResolver {
 
   private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) 
throws IOException {
     LOG.info("Reading schema from {}", orcFilePath);
-
-    HoodieAvroOrcReader orcReader = new 
HoodieAvroOrcReader(metaClient.getRawHoodieStorage().getConf(), orcFilePath);
+    HoodieFileReader orcReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        .getFileReader(metaClient.getTableConfig(), 
metaClient.getRawHoodieStorage().getConf(), orcFilePath,
+            HoodieFileFormat.ORC, Option.empty());
     return convertAvroSchemaToParquet(orcReader.getSchema());
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 4d7f3f838f2..6c2e6802769 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -36,7 +36,6 @@ import org.apache.hudi.storage.inline.InLineFSUtils;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -107,38 +106,25 @@ public class HoodieParquetDataBlock extends 
HoodieDataBlock {
     }
 
     Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) 
{
-      HoodieFileWriter parquetWriter = null;
-      HoodieConfig config = new HoodieConfig();
-      config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get().name());
-      config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
-      config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
-      config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 
* 1024));
-      config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
-      config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
-      HoodieRecordType recordType = records.iterator().next().getRecordType();
-      try {
-        parquetWriter = HoodieFileWriterFactory.getFileWriter(
-            HoodieFileFormat.PARQUET,
-            outputStream,
-            HoodieStorageUtils.getStorageConf(new Configuration()),
-            config,
-            writerSchema,
-            recordType);
-        for (HoodieRecord<?> record : records) {
-          String recordKey = getRecordKey(record).orElse(null);
-          parquetWriter.write(recordKey, record, writerSchema);
-        }
-        outputStream.flush();
-      } finally {
-        if (parquetWriter != null) {
-          parquetWriter.close();
-        }
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get().name());
+    config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+    config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+    config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
+    config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
+    config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
+    HoodieRecordType recordType = records.iterator().next().getRecordType();
+    try (HoodieFileWriter parquetWriter = 
HoodieFileWriterFactory.getFileWriter(
+        HoodieFileFormat.PARQUET, outputStream, 
HoodieStorageUtils.getStorageConf(new Configuration()),
+        config, writerSchema, recordType)) {
+      for (HoodieRecord<?> record : records) {
+        String recordKey = getRecordKey(record).orElse(null);
+        parquetWriter.write(recordKey, record, writerSchema);
       }
+      outputStream.flush();
     }
-
-    return baos.toByteArray();
+    return outputStream.toByteArray();
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 58ec48ea630..42f8a6a2753 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.StoragePath;
 
@@ -266,7 +266,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
           .filter(fileName -> filter == null || 
LSMTimeline.isFileInRange(filter, fileName))
           .parallel().forEach(fileName -> {
             // Read the archived file
-            try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
+            try (HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
                 .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, 
metaClient.getStorageConf(), new StoragePath(metaClient.getArchivePath(), 
fileName))) {
               try (ClosableIterator<IndexedRecord> iterator = 
reader.getIndexedRecordIterator(HoodieLSMTimelineInstant.getClassSchema(), 
readSchema)) {
                 while (iterator.hasNext()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index a829880d5f9..9b49fa871e2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -18,10 +18,32 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+
+import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
 /**
- * Marker interface for every {@link HoodieFileReader} reading in Avro (ie
- * producing {@link IndexedRecord}s)
+ * Base class for every Avro file reader
  */
-public interface HoodieAvroFileReader extends HoodieFileReader<IndexedRecord> 
{}
+public abstract class HoodieAvroFileReader implements 
HoodieFileReader<IndexedRecord> {
+
+  @Override
+  public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordIterator(Schema readerSchema, Schema requestedSchema) throws 
IOException {
+    ClosableIterator<IndexedRecord> iterator = 
getIndexedRecordIterator(readerSchema, requestedSchema);
+    return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new 
HoodieAvroIndexedRecord(data)));
+  }
+
+  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema) throws IOException {
+    return getIndexedRecordIterator(readerSchema, readerSchema);
+  }
+
+  public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws 
IOException;
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
deleted file mode 100644
index b15ce11fd53..00000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderBase.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.io.storage;
-
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-
-import java.io.IOException;
-
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-
-/**
- * Base class for every {@link HoodieAvroFileReader}
- */
-abstract class HoodieAvroFileReaderBase implements HoodieAvroFileReader {
-
-  @Override
-  public ClosableIterator<HoodieRecord<IndexedRecord>> 
getRecordIterator(Schema readerSchema, Schema requestedSchema) throws 
IOException {
-    ClosableIterator<IndexedRecord> iterator = 
getIndexedRecordIterator(readerSchema, requestedSchema);
-    return new CloseableMappingIterator<>(iterator, data -> unsafeCast(new 
HoodieAvroIndexedRecord(data)));
-  }
-
-  protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema) throws IOException {
-    return getIndexedRecordIterator(readerSchema, readerSchema);
-  }
-
-  public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws 
IOException;
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
index 5e1a260e158..dd28d5f5589 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReaderImplBase.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 import static org.apache.hudi.common.util.CollectionUtils.toStream;
 import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
 
-public abstract class HoodieAvroHFileReaderImplBase extends 
HoodieAvroFileReaderBase
+public abstract class HoodieAvroHFileReaderImplBase extends 
HoodieAvroFileReader
     implements HoodieSeekingFileReader<IndexedRecord> {
   // TODO HoodieHFileReader right now tightly coupled to MT, we should break 
that coupling
   public static final String SCHEMA_KEY = "schema";
@@ -54,7 +54,7 @@ public abstract class HoodieAvroHFileReaderImplBase extends 
HoodieAvroFileReader
    * <p>
    * Reads all the records with given schema
    */
-  public static List<IndexedRecord> readAllRecords(HoodieAvroFileReaderBase 
reader)
+  public static List<IndexedRecord> readAllRecords(HoodieAvroFileReader reader)
       throws IOException {
     Schema schema = reader.getSchema();
     return toStream(reader.getIndexedRecordIterator(schema))
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index fe075ccdc8f..c285f04a2b2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -46,14 +46,21 @@ public class HoodieFileReaderFactory {
   public static HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return new HoodieAvroFileReaderFactory();
+
+        try {
+          Class<?> clazz =
+              
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileReaderFactory");
+          return (HoodieFileReaderFactory) clazz.newInstance();
+        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
+          throw new HoodieException("Unable to create 
HoodieAvroFileReaderFactory", e);
+        }
       case SPARK:
         try {
           Class<?> clazz =
               
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileReaderFactory");
           return (HoodieFileReaderFactory) clazz.newInstance();
         } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create hoodie spark file writer 
factory", e);
+          throw new HoodieException("Unable to create 
HoodieSparkFileReaderFactory", e);
         }
       default:
         throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index d57dd55fcd5..69a8924f508 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -33,9 +33,9 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataOutputStream;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
@@ -46,13 +46,18 @@ public class HoodieFileWriterFactory {
   private static HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return new HoodieAvroFileWriterFactory();
+        try {
+          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory");
+          return (HoodieFileWriterFactory) clazz.newInstance();
+        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
+          throw new HoodieException("Unable to create 
HoodieAvroFileWriterFactory", e);
+        }
       case SPARK:
         try {
           Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.storage.HoodieSparkFileWriterFactory");
           return (HoodieFileWriterFactory) clazz.newInstance();
         } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
-          throw new HoodieException("Unable to create hoodie spark file writer 
factory", e);
+          throw new HoodieException("Unable to create 
HoodieSparkFileWriterFactory", e);
         }
       default:
         throw new UnsupportedOperationException(recordType + " record type not 
supported yet.");
@@ -67,8 +72,8 @@ public class HoodieFileWriterFactory {
     return factory.getFileWriterByFormat(extension, instantTime, path, conf, 
config, schema, taskContextSupplier);
   }
 
-  public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat 
format,
-                                                            FSDataOutputStream 
outputStream, StorageConfiguration<?> conf, HoodieConfig config, Schema schema, 
HoodieRecordType recordType)
+  public static <T, I, K, O> HoodieFileWriter getFileWriter(HoodieFileFormat 
format, OutputStream outputStream,
+                                                            
StorageConfiguration<?> conf, HoodieConfig config, Schema schema, 
HoodieRecordType recordType)
       throws IOException {
     HoodieFileWriterFactory factory = getWriterFactory(recordType);
     return factory.getFileWriterByFormat(format, outputStream, conf, config, 
schema);
@@ -89,8 +94,8 @@ public class HoodieFileWriterFactory {
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
-  protected <T, I, K, O> HoodieFileWriter 
getFileWriterByFormat(HoodieFileFormat format,
-                                                                
FSDataOutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig 
config, Schema schema) throws IOException {
+  protected <T, I, K, O> HoodieFileWriter 
getFileWriterByFormat(HoodieFileFormat format, OutputStream outputStream,
+                                                                
StorageConfiguration<?> conf, HoodieConfig config, Schema schema) throws 
IOException {
     switch (format) {
       case PARQUET:
         return newParquetFileWriter(outputStream, conf, config, schema);
@@ -106,7 +111,7 @@ public class HoodieFileWriterFactory {
   }
 
   protected HoodieFileWriter newParquetFileWriter(
-      FSDataOutputStream outputStream, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema) throws IOException {
+      OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig 
config, Schema schema) throws IOException {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
index c45e02452e3..7cac57fa919 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java
@@ -18,23 +18,24 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.storage.StorageConfiguration;
+
 import org.apache.orc.CompressionKind;
 
 public class HoodieOrcConfig {
-  static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
+  public static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema";
 
   private final CompressionKind compressionKind;
   private final int stripeSize;
   private final int blockSize;
   private final long maxFileSize;
-  private final Configuration hadoopConf;
+  private final StorageConfiguration<?> storageConf;
   private final BloomFilter bloomFilter;
 
-  public HoodieOrcConfig(Configuration hadoopConf, CompressionKind 
compressionKind, int stripeSize,
+  public HoodieOrcConfig(StorageConfiguration<?> storageConf, CompressionKind 
compressionKind, int stripeSize,
       int blockSize, long maxFileSize, BloomFilter bloomFilter) {
-    this.hadoopConf = hadoopConf;
+    this.storageConf = storageConf;
     this.compressionKind = compressionKind;
     this.stripeSize = stripeSize;
     this.blockSize = blockSize;
@@ -42,8 +43,8 @@ public class HoodieOrcConfig {
     this.bloomFilter = bloomFilter;
   }
 
-  public Configuration getHadoopConf() {
-    return hadoopConf;
+  public StorageConfiguration<?> getStorageConf() {
+    return storageConf;
   }
 
   public CompressionKind getCompressionKind() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
index b5e567b7644..e17a017d679 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetConfig.java
@@ -18,7 +18,8 @@
 
 package org.apache.hudi.io.storage;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.storage.StorageConfiguration;
+
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 /**
@@ -31,18 +32,18 @@ public class HoodieParquetConfig<T> {
   private final int blockSize;
   private final int pageSize;
   private final long maxFileSize;
-  private final Configuration hadoopConf;
+  private final StorageConfiguration<?> storageConf;
   private final double compressionRatio;
   private final boolean dictionaryEnabled;
 
-  public HoodieParquetConfig(T writeSupport, CompressionCodecName 
compressionCodecName, int blockSize,
-                             int pageSize, long maxFileSize, Configuration 
hadoopConf, double compressionRatio, boolean dictionaryEnabled) {
+  public HoodieParquetConfig(T writeSupport, CompressionCodecName 
compressionCodecName, int blockSize, int pageSize,
+                             long maxFileSize, StorageConfiguration<?> 
storageConf, double compressionRatio, boolean dictionaryEnabled) {
     this.writeSupport = writeSupport;
     this.compressionCodecName = compressionCodecName;
     this.blockSize = blockSize;
     this.pageSize = pageSize;
     this.maxFileSize = maxFileSize;
-    this.hadoopConf = hadoopConf;
+    this.storageConf = storageConf;
     this.compressionRatio = compressionRatio;
     this.dictionaryEnabled = dictionaryEnabled;
   }
@@ -63,8 +64,8 @@ public class HoodieParquetConfig<T> {
     return maxFileSize;
   }
 
-  public Configuration getHadoopConf() {
-    return hadoopConf;
+  public StorageConfiguration<?> getStorageConf() {
+    return storageConf;
   }
 
   public double getCompressionRatio() {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 43002a723ef..01052d4b00f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -19,14 +19,12 @@
 
 package org.apache.hudi.common.testutils.reader;
 
-import org.apache.hudi.avro.HoodieAvroWriteSupport;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.LocalTaskContextSupplier;
-import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -35,6 +33,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
@@ -44,12 +43,13 @@ import 
org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieAvroFileWriter;
-import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -57,7 +57,6 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
@@ -78,7 +77,6 @@ import static 
org.apache.hudi.common.testutils.FileCreateUtils.logFileName;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
 import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.DELETE;
 import static 
org.apache.hudi.common.testutils.reader.DataGenerationPlan.OperationType.INSERT;
-import static 
org.apache.hudi.io.storage.HoodieAvroFileWriterFactory.HOODIE_AVRO_PARQUET_WRITER;
 
 public class HoodieFileSliceTestUtils {
   public static final String FORWARD_SLASH = "/";
@@ -247,36 +245,31 @@ public class HoodieFileSliceTestUtils {
       Schema schema,
       String baseInstantTime
   ) throws IOException {
-    Configuration hadoopConf = new Configuration();
+    StorageConfiguration<Configuration> conf = 
HoodieTestUtils.getDefaultStorageConfWithDefaults();
 
     // TODO: Optimize these hard-coded parameters for test purpose. (HUDI-7214)
-    BloomFilter filter = BloomFilterFactory.createBloomFilter(
-        1000,
-        0.0001,
-        10000,
-        BloomFilterTypeCode.DYNAMIC_V0.name());
-    HoodieAvroWriteSupport<IndexedRecord> writeSupport = new 
HoodieAvroWriteSupport<>(
-        new AvroSchemaConverter().convert(schema),
-        schema,
-        Option.of(filter),
-        new Properties());
-    HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new 
HoodieParquetConfig(
-        writeSupport,
-        CompressionCodecName.GZIP,
-        ParquetWriter.DEFAULT_BLOCK_SIZE,
-        ParquetWriter.DEFAULT_PAGE_SIZE,
-        1024 * 1024 * 1024,
-        hadoopConf,
-        0.1,
-        true);
-
-    try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) 
ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER,
-        new Class<?>[] {StoragePath.class, HoodieParquetConfig.class, 
String.class, TaskContextSupplier.class, boolean.class},
-        new StoragePath(baseFilePath),
-        parquetConfig,
-        baseInstantTime,
-        new LocalTaskContextSupplier(),
-        true)) {
+    HoodieConfig cfg = new HoodieConfig();
+    //enable bloom filter
+    cfg.setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(), "true");
+    cfg.setValue(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED.key(), 
"true");
+
+    //set bloom filter values
+    cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), 
String.valueOf(1000));
+    cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE.key(), 
String.valueOf(0.00001));
+    cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), 
String.valueOf(10000));
+    cfg.setValue(HoodieStorageConfig.BLOOM_FILTER_TYPE.key(), 
BloomFilterTypeCode.DYNAMIC_V0.name());
+
+    //set parquet config values
+    cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), 
CompressionCodecName.GZIP.name());
+    cfg.setValue(HoodieStorageConfig.PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+    cfg.setValue(HoodieStorageConfig.PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+    cfg.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key(), 
String.valueOf(1024 * 1024 * 1024));
+    cfg.setValue(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(0.1));
+    cfg.setValue(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.key(), "true");
+
+    try (HoodieAvroFileWriter writer = (HoodieAvroFileWriter) 
HoodieFileWriterFactory
+        .getFileWriter(baseInstantTime, new StoragePath(baseFilePath), conf, 
cfg,
+            schema, new LocalTaskContextSupplier(), 
HoodieRecord.HoodieRecordType.AVRO)) {
       for (IndexedRecord record : records) {
         writer.writeAvro(
             (String) record.get(schema.getField(ROW_KEY).pos()), record);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
index b2d7306d6ab..6eb6733b04b 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java
@@ -20,11 +20,13 @@
 package org.apache.hudi.common.testutils.reader;
 
 import org.apache.hudi.avro.model.HoodieDeleteRecord;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -32,7 +34,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -78,7 +81,9 @@ public class HoodieTestReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Schema requiredSchema,
       StorageConfiguration<?> conf
   ) throws IOException {
-    HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, new 
StoragePath(filePath.toUri()));
+    HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieFileReaderFactory
+        
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
+            conf, filePath, HoodieFileFormat.PARQUET, Option.empty());
     return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
index a0ec0dfdb89..2fc38c156a3 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterUtils.java
@@ -44,7 +44,7 @@ import static 
org.apache.hudi.io.hfile.TestHFileReader.DUMMY_BLOOM_FILTER;
  * Utils for reader and writer tests.
  */
 public class TestHoodieReaderWriterUtils {
-  static void writeHFileForTesting(String fileLocation,
+  public static void writeHFileForTesting(String fileLocation,
                                    int blockSize,
                                    Compression.Algorithm compressionAlgo,
                                    int numEntries,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
similarity index 81%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
index 6a6b0b67aa5..3a4d0b910ab 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileReaderFactory.java
@@ -7,19 +7,25 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
similarity index 80%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
index 2a727158e17..d0b8faa7589 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroFileWriterFactory.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -27,6 +28,11 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileWriter;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -40,18 +46,19 @@ import 
org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Properties;
 
-import static org.apache.hudi.io.storage.HoodieHFileConfig.CACHE_DATA_IN_L1;
-import static 
org.apache.hudi.io.storage.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.PREFETCH_ON_OPEN;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.CACHE_DATA_IN_L1;
+import static 
org.apache.hudi.io.hadoop.HoodieHFileConfig.DROP_BEHIND_CACHE_COMPACTION;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.PREFETCH_ON_OPEN;
 
 public class HoodieAvroFileWriterFactory extends HoodieFileWriterFactory {
   //hardcoded classes to remove at a later time
-  public static final String HOODIE_AVRO_PARQUET_WRITER = 
"org.apache.hudi.io.storage.HoodieAvroParquetWriter";
-  public static final String HOODIE_AVRO_HFILE_WRITER = 
"org.apache.hudi.io.storage.HoodieAvroHFileWriter";
-  public static final String HOODIE_AVRO_ORC_WRITER = 
"org.apache.hudi.io.storage.HoodieAvroOrcWriter";
+  public static final String HOODIE_AVRO_PARQUET_WRITER = 
"org.apache.hudi.io.hadoop.HoodieAvroParquetWriter";
+  public static final String HOODIE_AVRO_HFILE_WRITER = 
"org.apache.hudi.io.hadoop.HoodieAvroHFileWriter";
+  public static final String HOODIE_AVRO_ORC_WRITER = 
"org.apache.hudi.io.hadoop.HoodieAvroOrcWriter";
 
   @Override
   protected HoodieFileWriter newParquetFileWriter(
@@ -70,7 +77,7 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
         config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
         config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE),
         config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE),
-        conf.unwrapAs(Configuration.class), 
config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
+        conf, 
config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
         
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
     try {
       return (HoodieFileWriter) 
ReflectionUtils.loadClass(HOODIE_AVRO_PARQUET_WRITER,
@@ -83,16 +90,16 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
   }
 
   protected HoodieFileWriter newParquetFileWriter(
-      FSDataOutputStream outputStream, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema) throws IOException {
+      OutputStream outputStream, StorageConfiguration<?> conf, HoodieConfig 
config, Schema schema) throws IOException {
     HoodieAvroWriteSupport writeSupport = getHoodieAvroWriteSupport(conf, 
schema, config, false);
     HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig = new 
HoodieParquetConfig<>(writeSupport,
         
CompressionCodecName.fromConf(config.getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)),
         config.getInt(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
         config.getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE),
         config.getLong(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE), // todo: 
1024*1024*1024
-        conf.unwrapAs(Configuration.class), 
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
+        conf, 
config.getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
         config.getBoolean(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
-    return new HoodieParquetStreamWriter(outputStream, parquetConfig);
+    return new HoodieParquetStreamWriter(new FSDataOutputStream(outputStream, 
null), parquetConfig);
   }
 
   protected HoodieFileWriter newHFileFileWriter(
@@ -120,7 +127,7 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
       String instantTime, StoragePath path, StorageConfiguration<?> conf, 
HoodieConfig config, Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
     BloomFilter filter = createBloomFilter(config);
-    HoodieOrcConfig orcConfig = new 
HoodieOrcConfig(conf.unwrapAs(Configuration.class),
+    HoodieOrcConfig orcConfig = new HoodieOrcConfig(conf,
         
CompressionKind.valueOf(config.getString(HoodieStorageConfig.ORC_COMPRESSION_CODEC_NAME)),
         config.getInt(HoodieStorageConfig.ORC_STRIPE_SIZE),
         config.getInt(HoodieStorageConfig.ORC_BLOCK_SIZE),
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
similarity index 93%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index 6de6b24868b..379df6e97b9 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -26,6 +27,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDuplicateKeyException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
similarity index 86%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index 4d90590d953..c1f5b79c227 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -26,6 +27,8 @@ import org.apache.hudi.common.util.OrcReaderIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -47,7 +50,7 @@ import java.util.Set;
 /**
  * {@link HoodieFileReader} implementation for ORC format.
  */
-public class HoodieAvroOrcReader extends HoodieAvroFileReaderBase {
+public class HoodieAvroOrcReader extends HoodieAvroFileReader {
 
   private final StoragePath path;
   private final StorageConfiguration<?> conf;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
similarity index 91%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
index 07e7bc7f122..40e37fa145f 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcWriter.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
@@ -27,6 +28,8 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
@@ -70,7 +73,7 @@ public class HoodieAvroOrcWriter implements 
HoodieAvroFileWriter, Closeable {
   public HoodieAvroOrcWriter(String instantTime, StoragePath file, 
HoodieOrcConfig config, Schema schema,
                              TaskContextSupplier taskContextSupplier) throws 
IOException {
 
-    Configuration conf = HadoopFSUtils.registerFileSystem(file, 
config.getHadoopConf());
+    Configuration conf = HadoopFSUtils.registerFileSystem(file, 
config.getStorageConf().unwrapAs(Configuration.class));
     this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
     this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
     this.instantTime = instantTime;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
similarity index 93%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index 66ff6b483e0..d75660a9a7e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -29,6 +30,8 @@ import org.apache.hudi.common.util.ParquetReaderIterator;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -52,7 +55,7 @@ import static 
org.apache.hudi.common.util.TypeUtils.unsafeCast;
 /**
  * {@link HoodieFileReader} implementation for parquet format.
  */
-public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
+public class HoodieAvroParquetReader extends HoodieAvroFileReader {
 
   private final StoragePath path;
   private final StorageConfiguration<?> conf;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
similarity index 84%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
index 4269e6513a2..f8f9a8ccea0 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetWriter.java
@@ -7,20 +7,23 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.generic.IndexedRecord;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
similarity index 90%
rename from 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
index 06f1e513055..8f17fa0fa1e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieBaseParquetWriter.java
@@ -7,20 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.hadoop.conf.Configuration;
@@ -52,8 +54,9 @@ public abstract class HoodieBaseParquetWriter<R> implements 
Closeable {
 
   public HoodieBaseParquetWriter(StoragePath file,
                                  HoodieParquetConfig<? extends 
WriteSupport<R>> parquetConfig) throws IOException {
+    Configuration hadoopConf = 
parquetConfig.getStorageConf().unwrapAs(Configuration.class);
     ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(
-        HoodieWrapperFileSystem.convertToHoodiePath(file, 
parquetConfig.getHadoopConf())) {
+        HoodieWrapperFileSystem.convertToHoodiePath(file, hadoopConf)) {
       @Override
       protected ParquetWriter.Builder self() {
         return this;
@@ -73,8 +76,8 @@ public abstract class HoodieBaseParquetWriter<R> implements 
Closeable {
     
parquetWriterbuilder.withDictionaryEncoding(parquetConfig.dictionaryEnabled());
     
parquetWriterbuilder.withValidation(ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED);
     
parquetWriterbuilder.withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION);
-    parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, 
parquetConfig.getHadoopConf()));
-    handleParquetBloomFilters(parquetWriterbuilder, 
parquetConfig.getHadoopConf());
+    parquetWriterbuilder.withConf(HadoopFSUtils.registerFileSystem(file, 
hadoopConf));
+    handleParquetBloomFilters(parquetWriterbuilder, hadoopConf);
 
     parquetWriter = parquetWriterbuilder.build();
     // We cannot accurately measure the snappy compressed output file size. We 
are choosing a
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
similarity index 87%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
index 64cc607ef63..83b659a6be0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHFileConfig.java
@@ -7,18 +7,20 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.io.storage.HoodieHBaseKVComparator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellComparator;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
similarity index 84%
rename from 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
index 226266bf6cf..5fdd6505733 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieParquetStreamWriter.java
@@ -7,19 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
 
 import org.apache.avro.generic.IndexedRecord;
@@ -54,7 +57,7 @@ public class HoodieParquetStreamWriter implements 
HoodieAvroFileWriter, AutoClos
         .withDictionaryPageSize(parquetConfig.getPageSize())
         .withDictionaryEncoding(parquetConfig.dictionaryEnabled())
         .withWriterVersion(ParquetWriter.DEFAULT_WRITER_VERSION)
-        .withConf(parquetConfig.getHadoopConf())
+        .withConf(parquetConfig.getStorageConf().unwrapAs(Configuration.class))
         .build();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
similarity index 100%
rename from 
hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
rename to 
hudi-hadoop-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
similarity index 83%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 96b8ea9e6b3..7faf84a1ee5 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -7,19 +7,22 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
similarity index 86%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
index f9909b0f5f2..82a80b1ce26 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieBaseParquetWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieBaseParquetWriter.java
@@ -7,28 +7,31 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HoodieParquetConfig;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -83,7 +86,7 @@ public class TestHoodieBaseParquetWriter {
   public void testCanWrite() throws IOException {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, 
10000,
         BloomFilterTypeCode.DYNAMIC_V0.name());
-    Configuration hadoopConf = new Configuration();
+    StorageConfiguration conf = 
HoodieTestUtils.getDefaultStorageConfWithDefaults();
 
     Schema schema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new 
AvroSchemaConverter().convert(schema),
@@ -92,7 +95,7 @@ public class TestHoodieBaseParquetWriter {
     long maxFileSize = 2 * 1024 * 1024;
     HoodieParquetConfig<HoodieAvroWriteSupport> parquetConfig =
         new HoodieParquetConfig<>(writeSupport, CompressionCodecName.GZIP, 
ParquetWriter.DEFAULT_BLOCK_SIZE,
-            ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, hadoopConf, 0, true);
+            ParquetWriter.DEFAULT_PAGE_SIZE, maxFileSize, conf, 0, true);
 
     StoragePath filePath = new StoragePath(
         new StoragePath(tempDir.toUri()), "test_fileSize.parquet");
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
similarity index 93%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index 48fa1ddc501..8c227b88e0f 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHBaseHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -7,19 +7,24 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieHBaseAvroHFileReader;
+import org.apache.hudi.io.storage.HoodieHFileUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
similarity index 85%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index 6fe0e2ffea5..b87af2c8371 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -7,19 +7,23 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
 import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.avro.Schema;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
similarity index 98%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index dcd791956c5..3bdd561a282 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriterBase.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.config.HoodieStorageConfig;
@@ -29,6 +29,9 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
@@ -75,7 +78,7 @@ import static 
org.apache.hudi.io.hfile.TestHFileReader.BOOTSTRAP_INDEX_HFILE_SUF
 import static 
org.apache.hudi.io.hfile.TestHFileReader.COMPLEX_SCHEMA_HFILE_SUFFIX;
 import static 
org.apache.hudi.io.hfile.TestHFileReader.SIMPLE_SCHEMA_HFILE_SUFFIX;
 import static org.apache.hudi.io.hfile.TestHFileReader.readHFileFromResources;
-import static org.apache.hudi.io.storage.HoodieHFileConfig.HFILE_COMPARATOR;
+import static org.apache.hudi.io.hadoop.HoodieHFileConfig.HFILE_COMPARATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
similarity index 87%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
index bc719be8bc8..6a94a32ed3c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieOrcReaderWriter.java
@@ -7,16 +7,17 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
@@ -25,6 +26,10 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.io.storage.HoodieOrcConfig;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
@@ -57,7 +62,7 @@ public class TestHoodieOrcReaderWriter extends 
TestHoodieReaderWriterBase {
   protected HoodieAvroOrcWriter createWriter(
       Schema avroSchema, boolean populateMetaFields) throws Exception {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, 
-1, BloomFilterTypeCode.SIMPLE.name());
-    Configuration conf = new Configuration();
+    StorageConfiguration conf = 
HoodieTestUtils.getDefaultStorageConfWithDefaults();
     int orcStripSize = 
Integer.parseInt(HoodieStorageConfig.ORC_STRIPE_SIZE.defaultValue());
     int orcBlockSize = 
Integer.parseInt(HoodieStorageConfig.ORC_BLOCK_SIZE.defaultValue());
     int maxFileSize = 
Integer.parseInt(HoodieStorageConfig.ORC_FILE_MAX_SIZE.defaultValue());
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
similarity index 97%
rename from 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
rename to 
hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
index 226cf10f97e..3fd0ad80319 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieReaderWriterBase.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.io.storage;
+package org.apache.hudi.io.hadoop;
 
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -26,6 +26,11 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
+import org.apache.hudi.io.storage.HoodieAvroFileWriter;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileWriter;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
index 4d925d3d4ed..791435f4bb7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SparkHelpers.scala
@@ -24,12 +24,13 @@ import org.apache.hudi.common.config.HoodieStorageConfig
 import 
org.apache.hudi.common.config.HoodieStorageConfig.{BLOOM_FILTER_DYNAMIC_MAX_ENTRIES,
 BLOOM_FILTER_FPP_VALUE, BLOOM_FILTER_NUM_ENTRIES_VALUE, BLOOM_FILTER_TYPE}
 import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
 import org.apache.hudi.common.util.{BaseFileUtils, Option}
-import org.apache.hudi.io.storage.{HoodieAvroParquetWriter, 
HoodieParquetConfig}
+import org.apache.hudi.io.storage.HoodieParquetConfig
 import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, 
StoragePath}
 
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
+import org.apache.hudi.io.hadoop.HoodieAvroParquetWriter
 import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.spark.sql.{DataFrame, SQLContext}
@@ -61,12 +62,12 @@ object SparkHelpers {
         HoodieStorageConfig.PARQUET_BLOCK_SIZE.defaultValue.toInt,
         HoodieStorageConfig.PARQUET_PAGE_SIZE.defaultValue.toInt,
         HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.defaultValue.toInt,
-        conf.unwrap(),
+        conf,
         
HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue.toDouble,
         HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED.defaultValue)
 
     // Add current classLoad for config, if not will throw classNotFound of 
'HoodieWrapperFileSystem'.
-    
parquetConfig.getHadoopConf().setClassLoader(Thread.currentThread.getContextClassLoader)
+    conf.unwrap().setClassLoader(Thread.currentThread.getContextClassLoader)
 
     val writer = new HoodieAvroParquetWriter(destinationFile, parquetConfig, 
instantTime, new SparkTaskContextSupplier(), true)
     for (rec <- sourceRecords) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index d39be52dd22..225cab39286 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -55,7 +55,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.index.HoodieIndex.IndexType;
-import org.apache.hudi.io.storage.HoodieAvroParquetReader;
+import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index 65d140da8b3..95f151336c7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.storage.HoodieParquetConfig;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 import org.apache.hudi.testutils.SparkDatasetTestUtils;
 
@@ -89,7 +90,7 @@ public class TestHoodieInternalRowParquetWriter extends 
HoodieSparkClientTestHar
     HoodieWriteConfig cfg = writeConfigBuilder.build();
     HoodieParquetConfig<HoodieRowParquetWriteSupport> parquetConfig = new 
HoodieParquetConfig<>(writeSupport,
         CompressionCodecName.SNAPPY, cfg.getParquetBlockSize(), 
cfg.getParquetPageSize(), cfg.getParquetMaxFileSize(),
-        writeSupport.getHadoopConf(), cfg.getParquetCompressionRatio(), 
cfg.parquetDictionaryEnabled());
+        new HadoopStorageConfiguration(writeSupport.getHadoopConf()), 
cfg.getParquetCompressionRatio(), cfg.parquetDictionaryEnabled());
 
     StoragePath filePath = new StoragePath(basePath + 
"/internal_row_writer.parquet");
 

Reply via email to