[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r797123944 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java ## @@ -481,4 +447,63 @@ public long moveToPrev() throws IOException { public void remove() { throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader"); } + + private static Path makeQualified(FileSystem fs, Path path) { +return path.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + } + + /** + * Fetch the right {@link FSDataInputStream} to be used by wrapping with required input streams. + * @param fs instance of {@link FileSystem} in use. + * @param bufferSize buffer size to be used. + * @return the right {@link FSDataInputStream} as required. + */ + private static FSDataInputStream getFSDataInputStream(FileSystem fs, Review comment: This didn't change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r797123287 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java ## @@ -18,151 +18,173 @@ package org.apache.hudi.common.table.log.block; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; - import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FSDataInputStream; - -import javax.annotation.Nonnull; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieIOException; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; /** * DataBlock contains a list of records serialized using formats compatible with the base file format. * For each base file format there is a corresponding DataBlock format. - * + * * The Datablock contains: * 1. Data Block version * 2. Total number of records in the block * 3. Actual serialized content of the records */ public abstract class HoodieDataBlock extends HoodieLogBlock { - protected List records; - protected Schema schema; - protected String keyField; + // TODO rebase records/content to leverage Either to warrant + // that they are mutex (used by read/write flows respectively) + private Option> records; - public HoodieDataBlock(@Nonnull Map logBlockHeader, - @Nonnull Map logBlockFooter, - @Nonnull Option blockContentLocation, @Nonnull Option content, - FSDataInputStream inputStream, boolean readBlockLazily) { -super(logBlockHeader, logBlockFooter, blockContentLocation, content, inputStream, readBlockLazily); -this.keyField = HoodieRecord.RECORD_KEY_METADATA_FIELD; - } + /** + * Dot-path notation reference to the key field w/in the record's schema + */ + private final String keyFieldRef; Review comment: Double-checked this is indeed expected to just be a field-name (w/in the record's schema) not dot-path notation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r797120652 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java ## @@ -224,6 +217,33 @@ public static HoodieAvroDataBlock getBlock(byte[] content, Schema readerSchema) return new HoodieAvroDataBlock(records, readerSchema); } + private static byte[] compress(String text) { Review comment: This just have been moved down to make it easier to analyze actually overridden methods. > can we move this to IOUtils or some common utils class? This are actually a deprecated methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r796909001 ## File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java ## @@ -60,7 +59,7 @@ public HoodieLogFile(FileStatus fileStatus) { public HoodieLogFile(Path logPath) { this.fileStatus = null; this.pathStr = logPath.toString(); -this.fileLen = 0; +this.fileLen = -1; Review comment: So the file len of 0 is very confusing it could be a legitimate case. I had to go and check whether file was indeed of length 0. Setting it to -1 makes it clear that this is not initialized and can't be used as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r784457956 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java ## @@ -497,4 +489,32 @@ private void flushToDiskIfRequired(HoodieRecord record) { numberOfRecords = 0; } } + + protected void appendDataAndDeleteBlocks(Map header) { Review comment: Frankly, can't understand why this method got moved. Nothing should have changed here. Let me revert this ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java ## @@ -497,4 +489,32 @@ private void flushToDiskIfRequired(HoodieRecord record) { numberOfRecords = 0; } } + + protected void appendDataAndDeleteBlocks(Map header) { Review comment: Oh, i've found the root-cause of those inexplicable changes -- i guess after upgrade of IDEA i've got new box "Rearrange code" ticked automatically (never ticked it myself) and i've confirmed it being the root-cause of those random re-orderings https://user-images.githubusercontent.com/428277/149439483-0d67b659-a61c-4ffe-807e-70a07e9d7d0e.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r778577147 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java ## @@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException { HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion(); // 3. Read the block type for a log block -if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) { - type = inputStream.readInt(); - - ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, "Invalid block byte type found " + type); - blockType = HoodieLogBlockType.values()[type]; -} +HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion); // 4. Read the header for a log block, if present -if (nextBlockVersion.hasHeader()) { - header = HoodieLogBlock.getLogMetadata(inputStream); -} -int contentLength = blocksize; +Map header = +nextBlockVersion.hasHeader() ? HoodieLogBlock.getLogMetadata(inputStream) : null; + // 5. Read the content length for the content -if (nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION) { - contentLength = (int) inputStream.readLong(); -} +// Fallback to full-block size if no content-length +// TODO replace w/ hasContentLength +int contentLength = +nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : blockSize; // 6. Read the content or skip content based on IO vs Memory trade-off by client -// TODO - have a max block size and reuse this buffer in the ByteBuffer -// (hard to guess max block size for now) long contentPosition = inputStream.getPos(); -byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, contentLength, readBlockLazily); +boolean shouldReadLazily = readBlockLazily && nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION; +Option content = HoodieLogBlock.tryReadContent(inputStream, contentLength, shouldReadLazily); // 7. Read footer if any -Map footer = null; -if (nextBlockVersion.hasFooter()) { - footer = HoodieLogBlock.getLogMetadata(inputStream); -} +Map footer = +nextBlockVersion.hasFooter() ? HoodieLogBlock.getLogMetadata(inputStream) : null; // 8. Read log block length, if present. This acts as a reverse pointer when traversing a // log file in reverse -@SuppressWarnings("unused") -long logBlockLength = 0; if (nextBlockVersion.hasLogBlockLength()) { - logBlockLength = inputStream.readLong(); + inputStream.readLong(); } // 9. Read the log block end position in the log file long blockEndPos = inputStream.getPos(); switch (Objects.requireNonNull(blockType)) { - // based on type read the block case AVRO_DATA_BLOCK: if (nextBlockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) { - return HoodieAvroDataBlock.getBlock(content, readerSchema); + return HoodieAvroDataBlock.getBlock(content.get(), readerSchema); } else { - return new HoodieAvroDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, - contentPosition, contentLength, blockEndPos, readerSchema, header, footer, keyField); + return new HoodieAvroDataBlock(logFile, inputStream, content, readBlockLazily, + contentPosition, contentLength, blockEndPos, Option.ofNullable(readerSchema), header, footer, keyField); } + case HFILE_DATA_BLOCK: -return new HoodieHFileDataBlock(logFile, inputStream, Option.ofNullable(content), readBlockLazily, -contentPosition, contentLength, blockEndPos, readerSchema, -header, footer, enableInlineReading, keyField); +checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, +String.format("HFile block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); + +return new HoodieHFileDataBlock(logFile, inputStream, content, readBlockLazily, +contentPosition, contentLength, blockEndPos, Option.ofNullable(readerSchema), +header, footer, keyField, enableRecordLookups); + + case PARQUET_DATA_BLOCK: +checkState(nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION, +String.format("Parquet block could not be of version (%d)", HoodieLogFormatVersion.DEFAULT_VERSION)); + +return new HoodieParquetDataBlock(logFile, inputStream, content, readBlockLazily, +contentPosition, contentLength, blockEndPos, Option.ofNullable(readerSchema), header, footer, keyField); + case DELETE_BLOCK: -return HoodieDeleteBlock.getBlock(logFile, inputStream, Option.ofNullable(content
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r778523226 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java ## @@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc } public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, - boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading, + boolean readBlockLazily, boolean reverseReader, boolean enableRecordLookups, String keyField) throws IOException { -FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); -this.logFile = logFile; -this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize); +// NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path +// is prefixed with an appropriate scheme given that we're not propagating the FS +// further +this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath())); Review comment: 👍 ## File path: hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java ## @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.parquet.io; + +import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +/** + * Implementation of {@link InputFile} backed by {@code byte[]} buffer + */ +public class ByteBufferBackedInputFile implements InputFile { + private final byte[] buffer; + private final int offset; + private final int length; + + public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) { +this.buffer = buffer; +this.offset = offset; +this.length = length; + } + + public ByteBufferBackedInputFile(byte[] buffer) { +this(buffer, 0, buffer.length); + } + + @Override + public long getLength() { +return length; + } + + @Override + public SeekableInputStream newStream() { Review comment: These classes are a simple wrappers (around `ByteBufferBackednputStream` which bears UT) and are tested by FTs (they're used in DataBlocks). Do we still want to test them w/ standalone UTs? ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java ## @@ -72,7 +76,7 @@ private long reverseLogFilePosition; private long lastReverseLogFilePosition; private boolean reverseReader; - private boolean enableInlineReading; + private boolean enableRecordLookups; Review comment: Not sure i understand your point fully -- this flag in particular is used to gate whether we allow point lookups for records when `getRecords()` method of the Block is used ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java ## @@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() { } @Override - protected void createRecordsFromContentBytes() throws IOException { -if (enableInlineReading) { - getRecords(Collections.emptyList()); -} else { - super.createRecordsFromContentBytes(); -} - } - - @Override - public List getRecords(List keys) throws IOException { -readWithInlineFS(keys); -return records; - } - - private void readWithInlineFS(List keys) throws IOException { -boolean enableFullScan = keys.isEmpty(); -// Get schema from the header -Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); -// If readerSchema was not present, use writerSchema -if (schema == null) { - schema = writerSchema; -} -Configuration conf = new Configuration(); -CacheConfig cacheConf = new CacheConfig(conf); + protected List lookupRecords(List keys) throws IOException { Configuration inlineConf = new Conf
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r778468264 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java ## @@ -110,59 +136,94 @@ public static HoodieLogBlock getBlock(HoodieLogBlockType logDataBlockFormat, Lis @Override public byte[] getContentBytes() throws IOException { // In case this method is called before realizing records from content -if (getContent().isPresent()) { - return getContent().get(); -} else if (readBlockLazily && !getContent().isPresent() && records == null) { - // read block lazily - createRecordsFromContentBytes(); +Option content = getContent(); + +checkState(content.isPresent() || records != null, "Block is in invalid state"); + +if (content.isPresent()) { + return content.get(); } -return serializeRecords(); Review comment: Correct. This could should not be reachable (this method is only used on the write path) ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; + +import javax.annotation.Nonnull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * HoodieParquetDataBlock contains a list of records serialized using Parquet. + */ +public class HoodieParquetDataBlock extends HoodieDataBlock { + + public HoodieParquetDataBlock(HoodieLogFile logFile, +FSDataInputStream inputStream, +Option content, +boolean readBlockLazily, +long position, long blockSize, long blockEndPos, +Option readerSchema, +Map header, +Map footer, +String keyField) { +super(content, +inputStream, +readBlockLazily, +Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), +readerSchema, +header, +footer, +keyField, +false); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header, + @Nonnull String keyField + ) { +super(records, header, new HashMap<>(), keyField); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header + ) { +super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + @Override + public HoodieLogBlockType getBlockType() { +return HoodieLogBlockType.PARQUET_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords(List records) throws I
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r773301245 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; + +import javax.annotation.Nonnull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * HoodieParquetDataBlock contains a list of records serialized using Parquet. + */ +public class HoodieParquetDataBlock extends HoodieDataBlock { + + public HoodieParquetDataBlock(HoodieLogFile logFile, +FSDataInputStream inputStream, +Option content, +boolean readBlockLazily, +long position, long blockSize, long blockEndPos, +Option readerSchema, +Map header, +Map footer, +String keyField) { +super(content, +inputStream, +readBlockLazily, +Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), +readerSchema, +header, +footer, +keyField, +false); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header, + @Nonnull String keyField + ) { +super(records, header, new HashMap<>(), keyField); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header + ) { +super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + @Override + public HoodieLogBlockType getBlockType() { +return HoodieLogBlockType.PARQUET_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords(List records) throws IOException { +if (records.size() == 0) { + return new byte[0]; +} + +Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + +HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( +new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty()); + +HoodieAvroParquetConfig avroParquetConfig = +new HoodieAvroParquetConfig( +writeSupport, +// TODO fetch compression codec from the config +CompressionCodecName.GZIP, +ParquetWriter.DEFAULT_BLOCK_SIZE, +ParquetWriter.DEFAULT_PAGE_SIZE, +1024 * 1024 * 1024, +new Configuration(), + Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + +ByteArrayOutputStream baos = new ByteArrayOutputStream(); + +try (FSDat
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r773291825 ## File path: hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java ## @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.parquet.io; + +import org.apache.hudi.common.util.io.ByteBufferBackedInputStream; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +/** + * Implementation of {@link InputFile} backed by {@code byte[]} buffer + */ +public class ByteBufferBackedInputFile implements InputFile { + private final byte[] buffer; + private final int offset; + private final int length; + + public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) { +this.buffer = buffer; +this.offset = offset; +this.length = length; + } + + public ByteBufferBackedInputFile(byte[] buffer) { +this(buffer, 0, buffer.length); + } + + @Override + public long getLength() { +return length; + } + + @Override + public SeekableInputStream newStream() { Review comment: These are covered by existing tests. There was previously coverage report added to PRs, wasn't there? What happened to it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a change in pull request #4333: [HUDI-431] Adding support for Parquet in MOR `LogBlock`s
alexeykudinkin commented on a change in pull request #4333: URL: https://github.com/apache/hudi/pull/4333#discussion_r772709134 ## File path: hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java ## @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.log.block; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroWriteSupport; +import org.apache.hudi.common.fs.inline.InLineFSUtils; +import org.apache.hudi.common.fs.inline.InLineFileSystem; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetReaderIterator; +import org.apache.hudi.io.storage.HoodieAvroParquetConfig; +import org.apache.hudi.io.storage.HoodieParquetStreamWriter; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; + +import javax.annotation.Nonnull; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * HoodieParquetDataBlock contains a list of records serialized using Parquet. + */ +public class HoodieParquetDataBlock extends HoodieDataBlock { + + public HoodieParquetDataBlock(HoodieLogFile logFile, +FSDataInputStream inputStream, +Option content, +boolean readBlockLazily, +long position, long blockSize, long blockEndPos, +Option readerSchema, +Map header, +Map footer, +String keyField) { +super(content, +inputStream, +readBlockLazily, +Option.of(new HoodieLogBlockContentLocation(logFile, position, blockSize, blockEndPos)), +readerSchema, +header, +footer, +keyField, +false); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header, + @Nonnull String keyField + ) { +super(records, header, new HashMap<>(), keyField); + } + + public HoodieParquetDataBlock( + @Nonnull List records, + @Nonnull Map header + ) { +super(records, header, new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD); + } + + @Override + public HoodieLogBlockType getBlockType() { +return HoodieLogBlockType.PARQUET_DATA_BLOCK; + } + + @Override + protected byte[] serializeRecords(List records) throws IOException { +if (records.size() == 0) { + return new byte[0]; +} + +Schema writerSchema = new Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA)); + +HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport( +new AvroSchemaConverter().convert(writerSchema), writerSchema, Option.empty()); + +HoodieAvroParquetConfig avroParquetConfig = +new HoodieAvroParquetConfig( +writeSupport, +// TODO fetch compression codec from the config +CompressionCodecName.GZIP, +ParquetWriter.DEFAULT_BLOCK_SIZE, +ParquetWriter.DEFAULT_PAGE_SIZE, +1024 * 1024 * 1024, +new Configuration(), + Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue())); + +ByteArrayOutputStream baos = new ByteArrayOutputStream(); + +try (FSDat