parisni commented on code in PR #8716: URL: https://github.com/apache/hudi/pull/8716#discussion_r1196389959
########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java: ########## @@ -18,46 +18,68 @@ package org.apache.hudi.io.storage; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; + import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; import org.apache.hudi.common.util.VisibleForTesting; + import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.api.WriteSupport; +import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import static org.apache.parquet.column.ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK; import static org.apache.parquet.column.ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK; +import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED; +import static org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_EXPECTED_NDV; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_WRITER_VERSION; /** * Base class of Hudi's custom {@link ParquetWriter} implementations * * @param <R> target type of the object being written into Parquet files (for ex, - * {@code IndexedRecord}, {@code InternalRow}) + * {@code IndexedRecord}, {@code InternalRow}) */ -public abstract class HoodieBaseParquetWriter<R> extends ParquetWriter<R> { +public abstract class HoodieBaseParquetWriter<R> implements Closeable { private final AtomicLong writtenRecordCount = new AtomicLong(0); private final long maxFileSize; private long recordCountForNextSizeCheck; + private final ParquetWriter parquetWriter; public HoodieBaseParquetWriter(Path file, HoodieParquetConfig<? extends WriteSupport<R>> parquetConfig) throws IOException { - super(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf()), - ParquetFileWriter.Mode.CREATE, - parquetConfig.getWriteSupport(), - parquetConfig.getCompressionCodecName(), - parquetConfig.getBlockSize(), - parquetConfig.getPageSize(), - parquetConfig.getPageSize(), - parquetConfig.dictionaryEnabled(), - DEFAULT_IS_VALIDATING_ENABLED, - DEFAULT_WRITER_VERSION, - FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); + ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { + @Override + protected ParquetWriter.Builder self() { Review Comment: please see my notes for the lineage on spark: it basically use an other parquet class to write, which get the hadoop conf directly. (ie ParquetOutputFormat) while hudi uses ParquetWriter directly - [ParquetOutputWriter in spark](https://github.com/apache/spark/blob/c8e85eab3fca0e4e5f4bdf9d1d6d1702ecf3fd07/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOutputWriter.scala#L31-L36) asks for parquet:ParquetOutputFormat which get the bloom configs - [ParquetUtils in spark](https://github.com/apache/spark/blob/c8e85eab3fca0e4e5f4bdf9d1d6d1702ecf3fd07/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala#L414C7-L492) has PrepareWrite function, which propagate to ParquetOurputWriter - [ParquetWrite in spark](https://github.com/apache/spark/blob/c8e85eab3fca0e4e5f4bdf9d1d6d1702ecf3fd07/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala#L35-L41) has prepareWrite function, which propagate to ParquetUtils.prepareWrite - [ParquetTable in spark](https://github.com/apache/spark/blob/c8e85eab3fca0e4e5f4bdf9d1d6d1702ecf3fd07/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala#L46-L50) uses ParquetWrite - [ParquetDatasourceV2 in spark](https://github.com/apache/spark/blob/c8e85eab3fca0e4e5f4bdf9d1d6d1702ecf3fd07/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetDataSourceV2.scala#L32-L37) uses ParquetTable in getTable (then for read and write) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org