[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map
nsivabalan commented on a change in pull request #3128: URL: https://github.com/apache/hudi/pull/3128#discussion_r669909750 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java ## @@ -188,21 +204,25 @@ public R get(Object key) { } private R get(ValueMetadata entry) { -return get(entry, getRandomAccessFile()); +return get(entry, getRandomAccessFile(), isCompressionEnabled); } - public static R get(ValueMetadata entry, RandomAccessFile file) { + public static R get(ValueMetadata entry, RandomAccessFile file, boolean isCompressionEnabled) { try { - return SerializationUtils - .deserialize(SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue())); + byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue()); + if (isCompressionEnabled) { +return SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk)); + } Review comment: not required to fix in this patch, but something to keep in mind. would be good to have an explicit else block for line 216. this "if" block is just one line and so its fine. But if its a large "if" block, then reader/dev might might wonder that some code path may not return from within "if" block and hence we have a return outside of "if" block. So, whenever you have "if" "else"s, try to always explicitly add else block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map
nsivabalan commented on a change in pull request #3128: URL: https://github.com/apache/hudi/pull/3128#discussion_r669909750 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java ## @@ -188,21 +204,25 @@ public R get(Object key) { } private R get(ValueMetadata entry) { -return get(entry, getRandomAccessFile()); +return get(entry, getRandomAccessFile(), isCompressionEnabled); } - public static R get(ValueMetadata entry, RandomAccessFile file) { + public static R get(ValueMetadata entry, RandomAccessFile file, boolean isCompressionEnabled) { try { - return SerializationUtils - .deserialize(SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue())); + byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, entry.getOffsetOfValue(), entry.getSizeOfValue()); + if (isCompressionEnabled) { +return SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk)); + } Review comment: not required to fix in this patch, but something to keep in mind. would be good to have an explicit else block for line 216. this "if" block is just one line and so its fine. But if its a large "if" block, then reader/dev might might wonder that some code path may not return from within "if" block and hence we have a return outside of "if" block. So, whenever you have if else, try to always explicitly add else block. ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java ## @@ -200,7 +200,7 @@ protected void initializeIncomingRecordsMap() { LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema), - config.getSpillableDiskMapType()); + config.getSpillableDiskMapType(), config.isBitCaskDiskMapCompressionEnabled()); Review comment: Do you wanna make change in HoodieMergedLogRecordScanner as well ? Or thats planned for a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map
nsivabalan commented on a change in pull request #3128: URL: https://github.com/apache/hudi/pull/3128#discussion_r668901207 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java ## @@ -290,6 +290,11 @@ .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map"); + public static final ConfigProperty DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty + .key("hoodie.diskmap.bitcask.enabled") + .defaultValue(false) Review comment: sorry, may I know why the property name does not have "compression"? I was expecting something like "hoodie.diskmap.bitcask.compression.enabled" ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java ## @@ -399,4 +419,47 @@ public int compareTo(ValueMetadata o) { return Long.compare(this.offsetOfValue, o.offsetOfValue); } } + + private static class CompressionHandler implements Serializable { +private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576; +private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192; + +// Caching ByteArrayOutputStreams to avoid recreating it for every operation +private final ByteArrayOutputStream compressBaos; +private final ByteArrayOutputStream decompressBaos; +private final byte[] decompressIntermediateBuffer; + +CompressionHandler() { + compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressIntermediateBuffer = new byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE]; +} + +private byte[] compressBytes(final byte[] value) throws IOException { + compressBaos.reset(); + Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION); + DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, deflater); + try { +dos.write(value); + } finally { +dos.close(); +deflater.end(); + } + return compressBaos.toByteArray(); +} + +private byte[] decompressBytes(final byte[] bytes) throws IOException { + decompressBaos.reset(); + InputStream in = new InflaterInputStream(new ByteArrayInputStream(bytes)); + try { +int len; +while ((len = in.read(decompressIntermediateBuffer)) > 0) { Review comment: is decompressIntermediateBuffer overwritten everytime is it? ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java ## @@ -108,7 +116,7 @@ public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, Si break; case BITCASK: default: -diskBasedMap = new BitCaskDiskMap<>(baseFilePath); +diskBasedMap = isCompressionEnabled ? (new BitCaskDiskMap<>(baseFilePath, true)) : (new BitCaskDiskMap<>(baseFilePath, false)); Review comment: new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled) ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java ## @@ -290,6 +290,11 @@ .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk map for External Spillable Map"); + public static final ConfigProperty DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty + .key("hoodie.diskmap.bitcask.enabled") + .defaultValue(false) Review comment: also, why not enable by default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map
nsivabalan commented on a change in pull request #3128: URL: https://github.com/apache/hudi/pull/3128#discussion_r659451616 ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java ## @@ -80,22 +84,38 @@ private final String baseFilePath; public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator keySizeEstimator, - SizeEstimator valueSizeEstimator) throws IOException { + SizeEstimator valueSizeEstimator) throws IOException { +this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, +valueSizeEstimator, DiskMapType.DISK_MAP); + } + + public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator keySizeEstimator, + SizeEstimator valueSizeEstimator, DiskMapType diskMapType) throws IOException { this.inMemoryMap = new HashMap<>(); this.baseFilePath = baseFilePath; -this.diskBasedMap = new DiskBasedMap<>(baseFilePath); this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap); this.currentInMemoryMapSize = 0L; this.keySizeEstimator = keySizeEstimator; this.valueSizeEstimator = valueSizeEstimator; +this.diskMapType = diskMapType; } - private DiskBasedMap getDiskBasedMap() { + private SpillableDiskMap getDiskBasedMap() { if (null == diskBasedMap) { synchronized (this) { if (null == diskBasedMap) { try { -diskBasedMap = new DiskBasedMap<>(baseFilePath); +switch (diskMapType) { + case ROCK_DB: +diskBasedMap = new SpillableRocksDBBasedMap<>(baseFilePath); +break; + case COMPRESSED_DISK_MAP: Review comment: compression is just a config right. is there a necessity to introduce a new enum? ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java ## @@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) { return Long.compare(this.offsetOfValue, o.offsetOfValue); } } + + private static class DiskCompressionInstance implements Serializable { +public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576; + +// Caching ByteArrayOutputStreams to avoid recreating it for every operation +private final ByteArrayOutputStream compressBaos; +private final ByteArrayOutputStream decompressBaos; +private final byte[] decompressBuffer; + +DiskCompressionInstance() { + compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBuffer = new byte[8192]; +} + +public byte[] compressBytes(final byte [] value) throws IOException { Review comment: I assume these are not required to be thread safe ? can you confirm this. ## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java ## @@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) { return Long.compare(this.offsetOfValue, o.offsetOfValue); } } + + private static class DiskCompressionInstance implements Serializable { +public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576; + +// Caching ByteArrayOutputStreams to avoid recreating it for every operation +private final ByteArrayOutputStream compressBaos; +private final ByteArrayOutputStream decompressBaos; +private final byte[] decompressBuffer; + +DiskCompressionInstance() { + compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBuffer = new byte[8192]; +} + +public byte[] compressBytes(final byte [] value) throws IOException { Review comment: are these required to be public ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org