nsivabalan commented on a change in pull request #3128: URL: https://github.com/apache/hudi/pull/3128#discussion_r659451616
########## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java ########## @@ -80,22 +84,38 @@ private final String baseFilePath; public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator, - SizeEstimator<R> valueSizeEstimator) throws IOException { + SizeEstimator<R> valueSizeEstimator) throws IOException { + this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator, + valueSizeEstimator, DiskMapType.DISK_MAP); + } + + public ExternalSpillableMap(Long maxInMemorySizeInBytes, String baseFilePath, SizeEstimator<T> keySizeEstimator, + SizeEstimator<R> valueSizeEstimator, DiskMapType diskMapType) throws IOException { this.inMemoryMap = new HashMap<>(); this.baseFilePath = baseFilePath; - this.diskBasedMap = new DiskBasedMap<>(baseFilePath); this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * sizingFactorForInMemoryMap); this.currentInMemoryMapSize = 0L; this.keySizeEstimator = keySizeEstimator; this.valueSizeEstimator = valueSizeEstimator; + this.diskMapType = diskMapType; } - private DiskBasedMap<T, R> getDiskBasedMap() { + private SpillableDiskMap<T, R> getDiskBasedMap() { if (null == diskBasedMap) { synchronized (this) { if (null == diskBasedMap) { try { - diskBasedMap = new DiskBasedMap<>(baseFilePath); + switch (diskMapType) { + case ROCK_DB: + diskBasedMap = new SpillableRocksDBBasedMap<>(baseFilePath); + break; + case COMPRESSED_DISK_MAP: Review comment: compression is just a config right. is there a necessity to introduce a new enum? ########## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java ########## @@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) { return Long.compare(this.offsetOfValue, o.offsetOfValue); } } + + private static class DiskCompressionInstance implements Serializable { + public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576; + + // Caching ByteArrayOutputStreams to avoid recreating it for every operation + private final ByteArrayOutputStream compressBaos; + private final ByteArrayOutputStream decompressBaos; + private final byte[] decompressBuffer; + + DiskCompressionInstance() { + compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBuffer = new byte[8192]; + } + + public byte[] compressBytes(final byte [] value) throws IOException { Review comment: I assume these are not required to be thread safe ? can you confirm this. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java ########## @@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) { return Long.compare(this.offsetOfValue, o.offsetOfValue); } } + + private static class DiskCompressionInstance implements Serializable { + public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576; + + // Caching ByteArrayOutputStreams to avoid recreating it for every operation + private final ByteArrayOutputStream compressBaos; + private final ByteArrayOutputStream decompressBaos; + private final byte[] decompressBuffer; + + DiskCompressionInstance() { + compressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBaos = new ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE); + decompressBuffer = new byte[8192]; + } + + public byte[] compressBytes(final byte [] value) throws IOException { Review comment: are these required to be public ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org