nsivabalan commented on a change in pull request #1804: URL: https://github.com/apache/hudi/pull/1804#discussion_r477545069
########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java ########## @@ -94,6 +100,16 @@ public Builder parquetPageSize(int pageSize) { return this; } + public Builder hfileMaxFileSize(long maxFileSize) { + props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize)); Review comment: @vinothchandar : wrt you comment on having two diff configs. I see similar configs at other places too. like bloom index parallelism, we have one config per index type. Initially I thought we will have any one config which will be used by any index type that is being initialized. But I saw that every index has its own set of configs and don't share any. ########## File path: hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java ########## @@ -135,15 +135,14 @@ // Compacting is very similar to applying updates to existing file Iterator<List<WriteStatus>> result; - // If the dataFile is present, there is a base parquet file present, perform updates else perform inserts into a - // new base parquet file. + // If the dataFile is present, perform updates else perform inserts into a new base file. if (oldDataFileOpt.isPresent()) { result = hoodieCopyOnWriteTable.handleUpdate(instantTime, operation.getPartitionPath(), operation.getFileId(), scanner.getRecords(), oldDataFileOpt.get()); } else { result = hoodieCopyOnWriteTable.handleInsert(instantTime, operation.getPartitionPath(), operation.getFileId(), - scanner.iterator()); + scanner.getRecords()); Review comment: if we have two overloaded methods, then we could use table. requireSortedRecords() and call appropriate methods either w/ iterator or with records. ########## File path: hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hudi.common.bloom.BloomFilter; + +public class HoodieHFileConfig { + + private Compression.Algorithm compressionAlgorithm; + private int blockSize; + private long maxFileSize; + private Configuration hadoopConf; + private BloomFilter bloomFilter; + + public HoodieHFileConfig(Compression.Algorithm compressionAlgorithm, int blockSize, long maxFileSize, Review comment: These are the 3 configs. May I know where these are added ``` conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, "true"); conf.set("CACHE_DATA_IN_L1","true"); conf.set("hbase.hfile.drop.behind.compaction", "false"); ``` ########## File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java ########## @@ -90,9 +91,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTa * Called by the compactor code path. */ public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, - String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) { + String partitionPath, String fileId, Map<String, HoodieRecord<T>> recordMap, Review comment: Not sure if we can leak the type of base file to compactor. But did you think about having two overloaded methods here. So for parquet compaction path, iterator will be passed in, where as for hfile compaction, record map will be passed in. ########## File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.utils.HoodieHiveUtils; +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * HoodieInputFormat for HUDI datasets which store data in HFile base file format. + */ +@UseFileSplitsFromInputFormat +public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, ArrayWritable> implements Configurable { + + private static final Logger LOG = LogManager.getLogger(HoodieHFileInputFormat.class); + + protected Configuration conf; + + protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline timeline) { + return HoodieInputFormatUtils.filterInstantsTimeline(timeline); + } + + @Override + public FileStatus[] listStatus(JobConf job) throws IOException { Review comment: yes, thats what I meant, by using some helper class. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieHFileReader; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import javax.annotation.Nonnull; + +/** + * HoodieHFileDataBlock contains a list of records stored inside an HFile format. It is used with the HFile + * base file format. + */ +public class HoodieHFileDataBlock extends HoodieDataBlock { + private static final Logger LOG = LogManager.getLogger(HoodieHFileDataBlock.class); + private static Compression.Algorithm compressionAlgorithm = Compression.Algorithm.GZ; + private static int blockSize = 1 * 1024 * 1024; + + public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> logBlockHeader, + @Nonnull Map<HeaderMetadataType, String> logBlockFooter, + @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, @Nonnull Option<byte[]> content, + FSDataInputStream inputStream, boolean readBlockLazily) { + super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); + } + + public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream inputStream, Option<byte[]> content, + boolean readBlockLazily, long position, long blockSize, long blockEndpos, Schema readerSchema, + Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> footer) { + super(content, inputStream, readBlockLazily, + Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndpos)), readerSchema, header, + footer); + } + + public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull Map<HeaderMetadataType, String> header) { + super(records, header, new HashMap<>()); + } + + @Override + public HoodieLogBlockType getBlockType() { + return HoodieLogBlockType.HFILE_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords() throws IOException { + HFileContext context = new HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm) + .build(); + Configuration conf = new Configuration(); + CacheConfig cacheConfig = new CacheConfig(conf); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FSDataOutputStream ostream = new FSDataOutputStream(baos, null); + + HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig) + .withOutputStream(ostream).withFileContext(context).create(); + + // Serialize records into bytes + Map<String, byte[]> sortedRecordsMap = new TreeMap<>(); Review comment: minor. why call this as sortedRecordsMap. I don't see any sorting actually ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org