[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-02-01 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r797123944



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -481,4 +447,63 @@ public long moveToPrev() throws IOException {
   public void remove() {
 throw new UnsupportedOperationException("Remove not supported for 
HoodieLogFileReader");
   }
+
+  private static Path makeQualified(FileSystem fs, Path path) {
+return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  /**
+   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
+   * @param fs instance of {@link FileSystem} in use.
+   * @param bufferSize buffer size to be used.
+   * @return the right {@link FSDataInputStream} as required.
+   */
+  private static FSDataInputStream getFSDataInputStream(FileSystem fs,

Review comment:
   This didn't change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-02-01 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r797123287



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##
@@ -18,151 +18,173 @@
 
 package org.apache.hudi.common.table.log.block;
 
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
-
-import javax.annotation.Nonnull;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
 
 /**
  * DataBlock contains a list of records serialized using formats compatible 
with the base file format.
  * For each base file format there is a corresponding DataBlock format.
- *
+ * 
  * The Datablock contains:
  *   1. Data Block version
  *   2. Total number of records in the block
  *   3. Actual serialized content of the records
  */
 public abstract class HoodieDataBlock extends HoodieLogBlock {
 
-  protected List records;
-  protected Schema schema;
-  protected String keyField;
+  // TODO rebase records/content to leverage Either to warrant
+  //  that they are mutex (used by read/write flows respectively)
+  private Option> records;
 
-  public HoodieDataBlock(@Nonnull Map 
logBlockHeader,
-  @Nonnull Map logBlockFooter,
-  @Nonnull Option blockContentLocation, 
@Nonnull Option content,
-  FSDataInputStream inputStream, boolean readBlockLazily) {
-super(logBlockHeader, logBlockFooter, blockContentLocation, content, 
inputStream, readBlockLazily);
-this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
-  }
+  /**
+   * Dot-path notation reference to the key field w/in the record's schema
+   */
+  private final String keyFieldRef;

Review comment:
   Double-checked this is indeed expected to just be a field-name (w/in the 
record's schema) not dot-path notation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-02-01 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r797120652



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##
@@ -224,6 +217,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content, 
Schema readerSchema)
 return new HoodieAvroDataBlock(records, readerSchema);
   }
 
+  private static byte[] compress(String text) {

Review comment:
   This just have been moved down to make it easier to analyze actually 
overridden methods. 
   
   > can we move this to IOUtils or some common utils class?
   
   This are actually a deprecated methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-02-01 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r796909001



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java
##
@@ -60,7 +59,7 @@ public HoodieLogFile(FileStatus fileStatus) {
   public HoodieLogFile(Path logPath) {
 this.fileStatus = null;
 this.pathStr = logPath.toString();
-this.fileLen = 0;
+this.fileLen = -1;

Review comment:
   So the file len of 0 is very confusing it could be a legitimate case. I 
had to go and check whether file was indeed of length 0. 
   
   Setting it to -1 makes it clear that this is not initialized and can't be 
used as is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-01-13 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r784457956



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -497,4 +489,32 @@ private void flushToDiskIfRequired(HoodieRecord record) {
   numberOfRecords = 0;
 }
   }
+
+  protected void appendDataAndDeleteBlocks(Map 
header) {

Review comment:
   Frankly, can't understand why this method got moved. Nothing should have 
changed here. Let me revert this 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -497,4 +489,32 @@ private void flushToDiskIfRequired(HoodieRecord record) {
   numberOfRecords = 0;
 }
   }
+
+  protected void appendDataAndDeleteBlocks(Map 
header) {

Review comment:
   Oh, i've found the root-cause of those inexplicable changes -- i guess 
after upgrade of IDEA i've got new box "Rearrange code" ticked automatically 
(never ticked it myself) and i've confirmed it being the root-cause of those 
random re-orderings
   
   https://user-images.githubusercontent.com/428277/149439483-0d67b659-a61c-4ffe-807e-70a07e9d7d0e.png";>





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-01-04 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778577147



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
 HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
 
 // 3. Read the block type for a log block
-if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-  type = inputStream.readInt();
-
-  ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, 
"Invalid block byte type found " + type);
-  blockType = HoodieLogBlockType.values()[type];
-}
+HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
 
 // 4. Read the header for a log block, if present
-if (nextBlockVersion.hasHeader()) {
-  header = HoodieLogBlock.getLogMetadata(inputStream);
-}
 
-int contentLength = blocksize;
+Map header =
+nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
 // 5. Read the content length for the content
-if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-  contentLength = (int) inputStream.readLong();
-}
+// Fallback to full-block size if no content-length
+// TODO replace w/ hasContentLength
+int contentLength =
+nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
 
 // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-// TODO - have a max block size and reuse this buffer in the ByteBuffer
-// (hard to guess max block size for now)
 long contentPosition = inputStream.getPos();
-byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, 
contentLength, readBlockLazily);
+boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+Option content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
 
 // 7. Read footer if any
-Map footer = null;
-if (nextBlockVersion.hasFooter()) {
-  footer = HoodieLogBlock.getLogMetadata(inputStream);
-}
+Map footer =
+nextBlockVersion.hasFooter() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
 
 // 8. Read log block length, if present. This acts as a reverse pointer 
when traversing a
 // log file in reverse
-@SuppressWarnings("unused")
-long logBlockLength = 0;
 if (nextBlockVersion.hasLogBlockLength()) {
-  logBlockLength = inputStream.readLong();
+  inputStream.readLong();
 }
 
 // 9. Read the log block end position in the log file
 long blockEndPos = inputStream.getPos();
 
 switch (Objects.requireNonNull(blockType)) {
-  // based on type read the block
   case AVRO_DATA_BLOCK:
 if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-  return HoodieAvroDataBlock.getBlock(content, readerSchema);
+  return HoodieAvroDataBlock.getBlock(content.get(), readerSchema);
 } else {
-  return new HoodieAvroDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-  contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer, keyField);
+  return new HoodieAvroDataBlock(logFile, inputStream, content, 
readBlockLazily,
+  contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
 }
+
   case HFILE_DATA_BLOCK:
-return new HoodieHFileDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-contentPosition, contentLength, blockEndPos, readerSchema,
-header, footer, enableInlineReading, keyField);
+checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+return new HoodieHFileDataBlock(logFile, inputStream, content, 
readBlockLazily,
+contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema),
+header, footer, keyField, enableRecordLookups);
+
+  case PARQUET_DATA_BLOCK:
+checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+return new HoodieParquetDataBlock(logFile, inputStream, content, 
readBlockLazily,
+contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
+
   case DELETE_BLOCK:
-return HoodieDeleteBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-01-04 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778523226



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
   }
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader, 
boolean enableInlineReading,
+ boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
  String keyField) throws IOException {
-FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
-this.logFile = logFile;
-this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+// NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
+//   is prefixed with an appropriate scheme given that we're not 
propagating the FS
+//   further
+this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));

Review comment:
   👍 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java
##
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Implementation of {@link InputFile} backed by {@code byte[]} buffer
+ */
+public class ByteBufferBackedInputFile implements InputFile {
+  private final byte[] buffer;
+  private final int offset;
+  private final int length;
+
+  public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
+this.buffer = buffer;
+this.offset = offset;
+this.length = length;
+  }
+
+  public ByteBufferBackedInputFile(byte[] buffer) {
+this(buffer, 0, buffer.length);
+  }
+
+  @Override
+  public long getLength() {
+return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {

Review comment:
   These classes are a simple wrappers (around `ByteBufferBackednputStream` 
which bears UT) and are tested by FTs (they're used in DataBlocks). Do we still 
want to test them w/ standalone UTs?

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##
@@ -72,7 +76,7 @@
   private long reverseLogFilePosition;
   private long lastReverseLogFilePosition;
   private boolean reverseReader;
-  private boolean enableInlineReading;
+  private boolean enableRecordLookups;

Review comment:
   Not sure i understand your point fully -- this flag in particular is 
used to gate whether we allow point lookups for records when 
`getRecords()` method of the Block is used 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
   }
 
   @Override
-  protected void createRecordsFromContentBytes() throws IOException {
-if (enableInlineReading) {
-  getRecords(Collections.emptyList());
-} else {
-  super.createRecordsFromContentBytes();
-}
-  }
-
-  @Override
-  public List getRecords(List keys) throws IOException {
-readWithInlineFS(keys);
-return records;
-  }
-
-  private void readWithInlineFS(List keys) throws IOException {
-boolean enableFullScan = keys.isEmpty();
-// Get schema from the header
-Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-// If readerSchema was not present, use writerSchema
-if (schema == null) {
-  schema = writerSchema;
-}
-Configuration conf = new Configuration();
-CacheConfig cacheConf = new CacheConfig(conf);
+  protected List lookupRecords(List keys) throws 
IOException {
 Configuration inlineConf = new Conf

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2022-01-04 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r778468264



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
##
@@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType 
logDataBlockFormat, Lis
   @Override
   public byte[] getContentBytes() throws IOException {
 // In case this method is called before realizing records from content
-if (getContent().isPresent()) {
-  return getContent().get();
-} else if (readBlockLazily && !getContent().isPresent() && records == 
null) {
-  // read block lazily
-  createRecordsFromContentBytes();
+Option content = getContent();
+
+checkState(content.isPresent() || records != null, "Block is in invalid 
state");
+
+if (content.isPresent()) {
+  return content.get();
 }
 
-return serializeRecords();

Review comment:
   Correct. This could should not be reachable (this method is only used on 
the write path) 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+FSDataInputStream inputStream,
+Option content,
+boolean readBlockLazily,
+long position, long blockSize, long 
blockEndPos,
+Option readerSchema,
+Map header,
+Map footer,
+String keyField) {
+super(content,
+inputStream,
+readBlockLazily,
+Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndPos)),
+readerSchema,
+header,
+footer,
+keyField,
+false);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header,
+  @Nonnull String keyField
+  ) {
+super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header
+  ) {
+super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List records) throws 
I

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2021-12-21 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r773301245



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+FSDataInputStream inputStream,
+Option content,
+boolean readBlockLazily,
+long position, long blockSize, long 
blockEndPos,
+Option readerSchema,
+Map header,
+Map footer,
+String keyField) {
+super(content,
+inputStream,
+readBlockLazily,
+Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndPos)),
+readerSchema,
+header,
+footer,
+keyField,
+false);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header,
+  @Nonnull String keyField
+  ) {
+super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header
+  ) {
+super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List records) throws 
IOException {
+if (records.size() == 0) {
+  return new byte[0];
+}
+
+Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+HoodieAvroParquetConfig avroParquetConfig =
+new HoodieAvroParquetConfig(
+writeSupport,
+// TODO fetch compression codec from the config
+CompressionCodecName.GZIP,
+ParquetWriter.DEFAULT_BLOCK_SIZE,
+ParquetWriter.DEFAULT_PAGE_SIZE,
+1024 * 1024 * 1024,
+new Configuration(),
+
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+try (FSDat

[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2021-12-21 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r773291825



##
File path: 
hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java
##
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Implementation of {@link InputFile} backed by {@code byte[]} buffer
+ */
+public class ByteBufferBackedInputFile implements InputFile {
+  private final byte[] buffer;
+  private final int offset;
+  private final int length;
+
+  public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
+this.buffer = buffer;
+this.offset = offset;
+this.length = length;
+  }
+
+  public ByteBufferBackedInputFile(byte[] buffer) {
+this(buffer, 0, buffer.length);
+  }
+
+  @Override
+  public long getLength() {
+return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {

Review comment:
   These are covered by existing tests. 
   
   There was previously coverage report added to PRs, wasn't there? What 
happened to it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s

2021-12-20 Thread GitBox


alexeykudinkin commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r772709134



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+FSDataInputStream inputStream,
+Option content,
+boolean readBlockLazily,
+long position, long blockSize, long 
blockEndPos,
+Option readerSchema,
+Map header,
+Map footer,
+String keyField) {
+super(content,
+inputStream,
+readBlockLazily,
+Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndPos)),
+readerSchema,
+header,
+footer,
+keyField,
+false);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header,
+  @Nonnull String keyField
+  ) {
+super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+  @Nonnull List records,
+  @Nonnull Map header
+  ) {
+super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List records) throws 
IOException {
+if (records.size() == 0) {
+  return new byte[0];
+}
+
+Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+HoodieAvroParquetConfig avroParquetConfig =
+new HoodieAvroParquetConfig(
+writeSupport,
+// TODO fetch compression codec from the config
+CompressionCodecName.GZIP,
+ParquetWriter.DEFAULT_BLOCK_SIZE,
+ParquetWriter.DEFAULT_PAGE_SIZE,
+1024 * 1024 * 1024,
+new Configuration(),
+
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+try (FSDat