nsivabalan commented on a change in pull request #1804:
URL: https://github.com/apache/hudi/pull/1804#discussion_r477545069



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java
##########
@@ -94,6 +100,16 @@ public Builder parquetPageSize(int pageSize) {
       return this;
     }
 
+    public Builder hfileMaxFileSize(long maxFileSize) {
+      props.setProperty(HFILE_FILE_MAX_BYTES, String.valueOf(maxFileSize));

Review comment:
       @vinothchandar : wrt you comment on having two diff configs. I see 
similar configs at other places too. like bloom index parallelism, we have one 
config per index type. Initially I thought we will have any one config which 
will be used by any index type that is being initialized. But I saw that every 
index has its own set of configs and don't share any. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/compact/HoodieMergeOnReadTableCompactor.java
##########
@@ -135,15 +135,14 @@
 
     // Compacting is very similar to applying updates to existing file
     Iterator<List<WriteStatus>> result;
-    // If the dataFile is present, there is a base parquet file present, 
perform updates else perform inserts into a
-    // new base parquet file.
+    // If the dataFile is present, perform updates else perform inserts into a 
new base file.
     if (oldDataFileOpt.isPresent()) {
       result = hoodieCopyOnWriteTable.handleUpdate(instantTime, 
operation.getPartitionPath(),
               operation.getFileId(), scanner.getRecords(),
           oldDataFileOpt.get());
     } else {
       result = hoodieCopyOnWriteTable.handleInsert(instantTime, 
operation.getPartitionPath(), operation.getFileId(),
-          scanner.iterator());
+          scanner.getRecords());

Review comment:
       if we have two overloaded methods, then we could use table. 
requireSortedRecords() and call appropriate methods either w/ iterator or with 
records. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hudi.common.bloom.BloomFilter;
+
+public class HoodieHFileConfig {
+
+  private Compression.Algorithm compressionAlgorithm;
+  private int blockSize;
+  private long maxFileSize;
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+
+  public HoodieHFileConfig(Compression.Algorithm compressionAlgorithm, int 
blockSize, long maxFileSize,

Review comment:
       These are the 3 configs. May I know where these are added 
   ```
       conf.set(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, "true");
       conf.set("CACHE_DATA_IN_L1","true");
       conf.set("hbase.hfile.drop.behind.compaction", "false");
   ```

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##########
@@ -90,9 +91,10 @@ public HoodieCreateHandle(HoodieWriteConfig config, String 
instantTime, HoodieTa
    * Called by the compactor code path.
    */
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-      String partitionPath, String fileId, Iterator<HoodieRecord<T>> 
recordIterator, SparkTaskContextSupplier sparkTaskContextSupplier) {
+      String partitionPath, String fileId, Map<String, HoodieRecord<T>> 
recordMap,

Review comment:
       Not sure if we can leak the type of base file to compactor. But did you 
think about having two overloaded methods here. So for parquet compaction path, 
iterator will be passed in, where as for hfile compaction, record map will be 
passed in. 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in HFile base file 
format.
+ */
+@UseFileSplitsFromInputFormat
+public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, 
ArrayWritable> implements Configurable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileInputFormat.class);
+
+  protected Configuration conf;
+
+  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline 
timeline) {
+    return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
+  }
+
+  @Override
+  public FileStatus[] listStatus(JobConf job) throws IOException {

Review comment:
       yes, thats what I meant, by using some helper class.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nonnull;
+
+/**
+ * HoodieHFileDataBlock contains a list of records stored inside an HFile 
format. It is used with the HFile
+ * base file format.
+ */
+public class HoodieHFileDataBlock extends HoodieDataBlock {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileDataBlock.class);
+  private static Compression.Algorithm compressionAlgorithm = 
Compression.Algorithm.GZ;
+  private static int blockSize = 1 * 1024 * 1024;
+
+  public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> 
logBlockHeader,
+       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, 
@Nonnull Option<byte[]> content,
+       FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, 
inputStream, readBlockLazily);
+  }
+
+  public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
+       boolean readBlockLazily, long position, long blockSize, long 
blockEndpos, Schema readerSchema,
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> 
footer) {
+    super(content, inputStream, readBlockLazily,
+          Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)), readerSchema, header,
+          footer);
+  }
+
+  public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull 
Map<HeaderMetadataType, String> header) {
+    super(records, header, new HashMap<>());
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.HFILE_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords() throws IOException {
+    HFileContext context = new 
HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
+        .build();
+    Configuration conf = new Configuration();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Serialize records into bytes
+    Map<String, byte[]> sortedRecordsMap = new TreeMap<>();

Review comment:
       minor. why call this as sortedRecordsMap. I don't see any sorting 
actually




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to