[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map

2021-07-14 Thread GitBox


nsivabalan commented on a change in pull request #3128:
URL: https://github.com/apache/hudi/pull/3128#discussion_r669909750



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
##
@@ -188,21 +204,25 @@ public R get(Object key) {
   }
 
   private R get(ValueMetadata entry) {
-return get(entry, getRandomAccessFile());
+return get(entry, getRandomAccessFile(), isCompressionEnabled);
   }
 
-  public static  R get(ValueMetadata entry, RandomAccessFile file) {
+  public static  R get(ValueMetadata entry, RandomAccessFile file, boolean 
isCompressionEnabled) {
 try {
-  return SerializationUtils
-  .deserialize(SpillableMapUtils.readBytesFromDisk(file, 
entry.getOffsetOfValue(), entry.getSizeOfValue()));
+  byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, 
entry.getOffsetOfValue(), entry.getSizeOfValue());
+  if (isCompressionEnabled) {
+return 
SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk));
+  }

Review comment:
   not required to fix in this patch, but something to keep in mind. would 
be good to have an explicit else block for line 216. this "if" block is just 
one line and so its fine. But if its a large "if" block, then reader/dev might 
might wonder that some code path may not return from within "if" block and 
hence we have a return outside of "if" block. 
   So, whenever you have "if" "else"s, try to always explicitly add else block. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map

2021-07-14 Thread GitBox


nsivabalan commented on a change in pull request #3128:
URL: https://github.com/apache/hudi/pull/3128#discussion_r669909750



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
##
@@ -188,21 +204,25 @@ public R get(Object key) {
   }
 
   private R get(ValueMetadata entry) {
-return get(entry, getRandomAccessFile());
+return get(entry, getRandomAccessFile(), isCompressionEnabled);
   }
 
-  public static  R get(ValueMetadata entry, RandomAccessFile file) {
+  public static  R get(ValueMetadata entry, RandomAccessFile file, boolean 
isCompressionEnabled) {
 try {
-  return SerializationUtils
-  .deserialize(SpillableMapUtils.readBytesFromDisk(file, 
entry.getOffsetOfValue(), entry.getSizeOfValue()));
+  byte[] bytesFromDisk = SpillableMapUtils.readBytesFromDisk(file, 
entry.getOffsetOfValue(), entry.getSizeOfValue());
+  if (isCompressionEnabled) {
+return 
SerializationUtils.deserialize(DISK_COMPRESSION_REF.get().decompressBytes(bytesFromDisk));
+  }

Review comment:
   not required to fix in this patch, but something to keep in mind. would 
be good to have an explicit else block for line 216. this "if" block is just 
one line and so its fine. But if its a large "if" block, then reader/dev might 
might wonder that some code path may not return from within "if" block and 
hence we have a return outside of "if" block. 
   So, whenever you have if else, try to always explicitly add else block. 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -200,7 +200,7 @@ protected void initializeIncomingRecordsMap() {
   LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
   this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
   new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(tableSchema),
-  config.getSpillableDiskMapType());
+  config.getSpillableDiskMapType(), 
config.isBitCaskDiskMapCompressionEnabled());

Review comment:
   Do you wanna make change in HoodieMergedLogRecordScanner as well ? Or 
thats planned for a follow up PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map

2021-07-13 Thread GitBox


nsivabalan commented on a change in pull request #3128:
URL: https://github.com/apache/hudi/pull/3128#discussion_r668901207



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -290,6 +290,11 @@
   .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
   .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk 
map for External Spillable Map");
 
+  public static final ConfigProperty 
DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty
+  .key("hoodie.diskmap.bitcask.enabled")
+  .defaultValue(false)

Review comment:
   sorry, may I know why the property name does not have "compression"? I 
was expecting something like "hoodie.diskmap.bitcask.compression.enabled"

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java
##
@@ -399,4 +419,47 @@ public int compareTo(ValueMetadata o) {
   return Long.compare(this.offsetOfValue, o.offsetOfValue);
 }
   }
+
+  private static class CompressionHandler implements Serializable {
+private static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
+private static final int DECOMPRESS_INTERMEDIATE_BUFFER_SIZE = 8192;
+
+// Caching ByteArrayOutputStreams to avoid recreating it for every 
operation
+private final ByteArrayOutputStream compressBaos;
+private final ByteArrayOutputStream decompressBaos;
+private final byte[] decompressIntermediateBuffer;
+
+CompressionHandler() {
+  compressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressIntermediateBuffer = new 
byte[DECOMPRESS_INTERMEDIATE_BUFFER_SIZE];
+}
+
+private byte[] compressBytes(final byte[] value) throws IOException {
+  compressBaos.reset();
+  Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION);
+  DeflaterOutputStream dos = new DeflaterOutputStream(compressBaos, 
deflater);
+  try {
+dos.write(value);
+  } finally {
+dos.close();
+deflater.end();
+  }
+  return compressBaos.toByteArray();
+}
+
+private byte[] decompressBytes(final byte[] bytes) throws IOException {
+  decompressBaos.reset();
+  InputStream in = new InflaterInputStream(new 
ByteArrayInputStream(bytes));
+  try {
+int len;
+while ((len = in.read(decompressIntermediateBuffer)) > 0) {

Review comment:
   is decompressIntermediateBuffer overwritten everytime is it? 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
##
@@ -108,7 +116,7 @@ public ExternalSpillableMap(Long maxInMemorySizeInBytes, 
String baseFilePath, Si
 break;
   case BITCASK:
   default:
-diskBasedMap = new BitCaskDiskMap<>(baseFilePath);
+diskBasedMap = isCompressionEnabled ? (new 
BitCaskDiskMap<>(baseFilePath, true)) : (new BitCaskDiskMap<>(baseFilePath, 
false));

Review comment:
   new BitCaskDiskMap<>(baseFilePath, isCompressionEnabled) 

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -290,6 +290,11 @@
   .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
   .withDocumentation("Enable usage of either BITCASK or ROCKS_DB as disk 
map for External Spillable Map");
 
+  public static final ConfigProperty 
DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty
+  .key("hoodie.diskmap.bitcask.enabled")
+  .defaultValue(false)

Review comment:
   also, why not enable by default




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3128: [HUDI-2029] Implement compression for DiskBasedMap in Spillable Map

2021-06-27 Thread GitBox


nsivabalan commented on a change in pull request #3128:
URL: https://github.com/apache/hudi/pull/3128#discussion_r659451616



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
##
@@ -80,22 +84,38 @@
   private final String baseFilePath;
 
   public ExternalSpillableMap(Long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator keySizeEstimator,
-  SizeEstimator valueSizeEstimator) throws IOException {
+  SizeEstimator valueSizeEstimator) throws 
IOException {
+this(maxInMemorySizeInBytes, baseFilePath, keySizeEstimator,
+valueSizeEstimator, DiskMapType.DISK_MAP);
+  }
+
+  public ExternalSpillableMap(Long maxInMemorySizeInBytes, String 
baseFilePath, SizeEstimator keySizeEstimator,
+  SizeEstimator valueSizeEstimator, DiskMapType 
diskMapType) throws IOException {
 this.inMemoryMap = new HashMap<>();
 this.baseFilePath = baseFilePath;
-this.diskBasedMap = new DiskBasedMap<>(baseFilePath);
 this.maxInMemorySizeInBytes = (long) Math.floor(maxInMemorySizeInBytes * 
sizingFactorForInMemoryMap);
 this.currentInMemoryMapSize = 0L;
 this.keySizeEstimator = keySizeEstimator;
 this.valueSizeEstimator = valueSizeEstimator;
+this.diskMapType = diskMapType;
   }
 
-  private DiskBasedMap getDiskBasedMap() {
+  private SpillableDiskMap getDiskBasedMap() {
 if (null == diskBasedMap) {
   synchronized (this) {
 if (null == diskBasedMap) {
   try {
-diskBasedMap = new DiskBasedMap<>(baseFilePath);
+switch (diskMapType) {
+  case ROCK_DB:
+diskBasedMap = new SpillableRocksDBBasedMap<>(baseFilePath);
+break;
+  case COMPRESSED_DISK_MAP:

Review comment:
   compression is just a config right. is there a necessity to introduce a 
new enum? 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
##
@@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) {
   return Long.compare(this.offsetOfValue, o.offsetOfValue);
 }
   }
+
+  private static class DiskCompressionInstance implements Serializable {
+public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
+
+// Caching ByteArrayOutputStreams to avoid recreating it for every 
operation
+private final ByteArrayOutputStream compressBaos;
+private final ByteArrayOutputStream decompressBaos;
+private final byte[] decompressBuffer;
+
+DiskCompressionInstance() {
+  compressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressBuffer = new byte[8192];
+}
+
+public byte[] compressBytes(final byte [] value) throws IOException {

Review comment:
   I assume these are not required to be thread safe ? can you confirm 
this. 

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
##
@@ -395,4 +417,48 @@ public int compareTo(ValueMetadata o) {
   return Long.compare(this.offsetOfValue, o.offsetOfValue);
 }
   }
+
+  private static class DiskCompressionInstance implements Serializable {
+public static final int DISK_COMPRESSION_INITIAL_BUFFER_SIZE = 1048576;
+
+// Caching ByteArrayOutputStreams to avoid recreating it for every 
operation
+private final ByteArrayOutputStream compressBaos;
+private final ByteArrayOutputStream decompressBaos;
+private final byte[] decompressBuffer;
+
+DiskCompressionInstance() {
+  compressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressBaos = new 
ByteArrayOutputStream(DISK_COMPRESSION_INITIAL_BUFFER_SIZE);
+  decompressBuffer = new byte[8192];
+}
+
+public byte[] compressBytes(final byte [] value) throws IOException {

Review comment:
   are these required to be public ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org