carp84 commented on a change in pull request #13688: URL: https://github.com/apache/flink/pull/13688#discussion_r511759563
########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java ########## @@ -95,4 +95,50 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) { static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) { return new WriteBufferManager(writeBufferManagerCapacity, cache); } + + /** + * Calculate the default arena block size as RocksDB calculates it in + * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"> + * here</a>. + * + * @return the default arena block size + * @param writeBufferSize the write buffer size (bytes) + */ + static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) { + long arenaBlockSize = writeBufferSize / 8; + + long align = 4 * 1024; + return ((arenaBlockSize + align - 1) / align) * align; Review comment: ```suggestion // Align up to 4k final long align = 4 * 1024; return ((arenaBlockSize + align - 1) / align) * align; ``` Marking the primitive variable as `final` would make it a constant variable and help the compiler to do optimization, please check [here](https://stackoverflow.com/questions/62603729/java-compiler-optimizations-with-final-local-variables) for more details. We could also make it as a class-level static final field, but keeping it here directly maps to the RocksDB code. ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java ########## @@ -78,6 +79,24 @@ public void testPathExceptionOnWindows() throws Exception { } } + public void testSanityCheckArenaBlockSize() { Review comment: ```suggestion @Test public void testSanityCheckArenaBlockSize() { ``` ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java ########## @@ -91,10 +93,45 @@ public void testCreateSharedResourcesWithExpectedCapacity() { long totalMemorySize = 2048L; double writeBufferRatio = 0.5; double highPriPoolRatio = 0.1; - RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio); + RocksDBSharedResources rocksDBSharedResources = RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio); long expectedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, writeBufferRatio); long expectedWbmCapacity = RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize, writeBufferRatio); + assertThat(actualCacheCapacity.get(), is(expectedCacheCapacity)); assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity)); + assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(), is(expectedWbmCapacity)); + } + + @Test + public void testCalculateRocksDBDefaultArenaBlockSize() { + long align = 4 * 1024; + + long writeBufferSize = 64 * 1024 * 1024; + + long expectArenaBlockSize = writeBufferSize / 8; + long expectArenaBlockSize2 = expectArenaBlockSize + align; + long expectArenaBlockSize3 = expectArenaBlockSize + 2 * align; + + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize), is(expectArenaBlockSize)); + + // Alignment tests + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize + 7), is(expectArenaBlockSize)); + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize + 8), is(expectArenaBlockSize2)); + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize + 8 * align + 7), is(expectArenaBlockSize2)); + assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize + 8 * align + 8), is(expectArenaBlockSize3)); Review comment: ```suggestion final long align = 4 * 1024; final long writeBufferSize = 64 * 1024 * 1024; final long expectArenaBlockSize = writeBufferSize / 8; // Normal case test assertThat("Arena block size calculation error for normal case", RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize), is(expectArenaBlockSize)); // Alignment tests assertThat("Arena block size calculation error for alignment case", RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize - 1), is(expectArenaBlockSize)); assertThat("Arena block size calculation error for alignment case2", RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize + 8), is(expectArenaBlockSize + align)); ``` ########## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java ########## @@ -95,4 +95,50 @@ static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) { static WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) { return new WriteBufferManager(writeBufferManagerCapacity, cache); } + + /** + * Calculate the default arena block size as RocksDB calculates it in + * <a href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196"> Review comment: ```suggestion * <a href="https://github.com/ververica/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196-L201"> ``` It's better to also include the source of alignment logic ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org