nsivabalan commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r771044754



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
##########
@@ -74,40 +75,50 @@ public static Path getInlineFilePath(Path outerPath, String 
origScheme, long inL
    * @return Outer file Path from the InLineFS Path
    */
   public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
-    final String scheme = inlineFSPath.getParent().getName();
+    assertInlineFSPath(inlineFSPath);
+
+    final String baseFileScheme = inlineFSPath.getParent().getName();
     final Path basePath = inlineFSPath.getParent().getParent();
-    
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
-        "Invalid InLineFSPath: " + inlineFSPath);
+    checkArgument(
+        basePath.toString().contains(SCHEME_SEPARATOR),
+        "Invalid InLineFS path: " + inlineFSPath);
 
     final String pathExceptScheme = 
basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 
1);
-    final String fullPath = scheme + SCHEME_SEPARATOR
-        + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+    final String fullPath = baseFileScheme + SCHEME_SEPARATOR
+        + (baseFileScheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : 
"")
         + pathExceptScheme;
     return new Path(fullPath);
   }
 
   /**
-   * Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
-   * output: 20
+   * Returns start offset w/in the base for the block identified by the given 
InlineFS path
    *
-   * @param inlinePath
-   * @return
+   * input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+   * output: 20
    */
-  public static int startOffset(Path inlinePath) {
-    String[] slices = inlinePath.toString().split("[?&=]");
+  public static int startOffset(Path inlineFSPath) {
+    assertInlineFSPath(inlineFSPath);
+
+    String[] slices = inlineFSPath.toString().split("[?&=]");
     return Integer.parseInt(slices[slices.length - 3]);
   }
 
   /**
-   * Eg input : "inlinefs:/file1/s3a/?start_offset=20&length=40".
-   * Output: 40
+   * Returns length of the block (embedded w/in the base file) identified by 
the given InlineFS path
    *
-   * @param inlinePath
-   * @return
+   * input: "inlinefs:/file1/s3a/?start_offset=20&length=40".
+   * output: 40
    */
   public static int length(Path inlinePath) {
+    assertInlineFSPath(inlinePath);

Review comment:
       same here.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType 
logDataBlockFormat, Lis
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == 
null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records != null, "Block is in invalid 
state");

Review comment:
       shouldn't the condition be AND instead of OR ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +207,85 @@ private HoodieLogBlock readBlock() throws IOException {
     HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
 
     // 3. Read the block type for a log block
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      type = inputStream.readInt();
-
-      ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, 
"Invalid block byte type found " + type);
-      blockType = HoodieLogBlockType.values()[type];
-    }
+    HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
 
     // 4. Read the header for a log block, if present
-    if (nextBlockVersion.hasHeader()) {
-      header = HoodieLogBlock.getLogMetadata(inputStream);
-    }
 
-    int contentLength = blocksize;
+    Map<HeaderMetadataType, String> header =
+        nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
     // 5. Read the content length for the content
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      contentLength = (int) inputStream.readLong();
-    }
+    // Fallback to full-block size if no content-length
+    // TODO replace w/ hasContentLength
+    int contentLength =
+        nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
 
     // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-    // TODO - have a max block size and reuse this buffer in the ByteBuffer
-    // (hard to guess max block size for now)
     long contentPosition = inputStream.getPos();
-    byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, 
contentLength, readBlockLazily);
+    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;

Review comment:
       why this change? previously we just had readBlockLazily if I am not 
wrong. can you throw some light. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.hudi.parquet.io.ByteBufferBackedInputFile;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(
+      HoodieLogFile logFile,
+      FSDataInputStream inputStream,
+      Option<byte[]> content,
+      boolean readBlockLazily, long position, long blockSize, long blockEndpos,
+      Option<Schema> readerSchema,
+      Map<HeaderMetadataType, String> header,
+      Map<HeaderMetadataType, String> footer,
+      String keyField
+  ) {
+    super(
+        content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws 
IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,

Review comment:
       can we file a tracking ticket. we need to introduce configs for these.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/inline/InLineFSUtils.java
##########
@@ -74,40 +75,50 @@ public static Path getInlineFilePath(Path outerPath, String 
origScheme, long inL
    * @return Outer file Path from the InLineFS Path
    */
   public static Path getOuterFilePathFromInlinePath(Path inlineFSPath) {
-    final String scheme = inlineFSPath.getParent().getName();
+    assertInlineFSPath(inlineFSPath);
+
+    final String baseFileScheme = inlineFSPath.getParent().getName();
     final Path basePath = inlineFSPath.getParent().getParent();
-    
ValidationUtils.checkArgument(basePath.toString().contains(SCHEME_SEPARATOR),
-        "Invalid InLineFSPath: " + inlineFSPath);
+    checkArgument(
+        basePath.toString().contains(SCHEME_SEPARATOR),
+        "Invalid InLineFS path: " + inlineFSPath);
 
     final String pathExceptScheme = 
basePath.toString().substring(basePath.toString().indexOf(SCHEME_SEPARATOR) + 
1);
-    final String fullPath = scheme + SCHEME_SEPARATOR
-        + (scheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : "")
+    final String fullPath = baseFileScheme + SCHEME_SEPARATOR
+        + (baseFileScheme.equals(LOCAL_FILESYSTEM_SCHEME) ? PATH_SEPARATOR : 
"")
         + pathExceptScheme;
     return new Path(fullPath);
   }
 
   /**
-   * Eg input : "inlinefs://file1/s3a/?start_offset=20&length=40".
-   * output: 20
+   * Returns start offset w/in the base for the block identified by the given 
InlineFS path
    *
-   * @param inlinePath
-   * @return
+   * input: "inlinefs://file1/s3a/?start_offset=20&length=40".
+   * output: 20
    */
-  public static int startOffset(Path inlinePath) {
-    String[] slices = inlinePath.toString().split("[?&=]");
+  public static int startOffset(Path inlineFSPath) {
+    assertInlineFSPath(inlineFSPath);

Review comment:
       do we need to assert for every such call ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -36,50 +29,63 @@
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.SizeAwareDataInputStream;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
+import javax.annotation.Nonnull;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nonnull;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * HoodieAvroDataBlock contains a list of records serialized using Avro. It is 
used with the Parquet base file format.
  */
 public class HoodieAvroDataBlock extends HoodieDataBlock {
 
-  private ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
-  private ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
-
-  public HoodieAvroDataBlock(@Nonnull Map<HeaderMetadataType, String> 
logBlockHeader,
-                             @Nonnull Map<HeaderMetadataType, String> 
logBlockFooter,
-                             @Nonnull Option<HoodieLogBlockContentLocation> 
blockContentLocation, @Nonnull Option<byte[]> content,
-                             FSDataInputStream inputStream, boolean 
readBlockLazily) {
-    super(logBlockHeader, logBlockFooter, blockContentLocation, content, 
inputStream, readBlockLazily);
-  }
-
-  public HoodieAvroDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
-                             boolean readBlockLazily, long position, long 
blockSize, long blockEndpos, Schema readerSchema,
-                             Map<HeaderMetadataType, String> header, 
Map<HeaderMetadataType, String> footer, String keyField) {
+  private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
+  private final ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
+
+  public HoodieAvroDataBlock(
+      HoodieLogFile logFile,

Review comment:
       wondering if the code style has been applied correctly? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##########
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType 
logDataBlockFormat, Lis
   @Override
   public byte[] getContentBytes() throws IOException {
     // In case this method is called before realizing records from content
-    if (getContent().isPresent()) {
-      return getContent().get();
-    } else if (readBlockLazily && !getContent().isPresent() && records == 
null) {
-      // read block lazily
-      createRecordsFromContentBytes();
+    Option<byte[]> content = getContent();
+
+    checkState(content.isPresent() || records != null, "Block is in invalid 
state");
+
+    if (content.isPresent()) {
+      return content.get();
     }
 
-    return serializeRecords();

Review comment:
       read blocks lazily was present in this method prior to this patch right? 
was it removed intentionally ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to