carp84 commented on a change in pull request #13688:
URL: https://github.com/apache/flink/pull/13688#discussion_r511759563



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,50 @@ static Cache createCache(long cacheCapacity, double 
highPriorityPoolRatio) {
        static WriteBufferManager createWriteBufferManager(long 
writeBufferManagerCapacity, Cache cache) {
                return new WriteBufferManager(writeBufferManagerCapacity, 
cache);
        }
+
+       /**
+        * Calculate the default arena block size as RocksDB calculates it in
+        * <a 
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196";>
+        * here</a>.
+        *
+        * @return the default arena block size
+        * @param writeBufferSize the write buffer size (bytes)
+        */
+       static long calculateRocksDBDefaultArenaBlockSize(long writeBufferSize) 
{
+               long arenaBlockSize = writeBufferSize / 8;
+
+               long align = 4 * 1024;
+               return ((arenaBlockSize + align - 1) / align) * align;

Review comment:
       ```suggestion
                // Align up to 4k
                final long align = 4 * 1024;
                return ((arenaBlockSize + align - 1) / align) * align;
   ```
   Marking the primitive variable as `final` would make it a constant variable 
and help the compiler to do optimization, please check 
[here](https://stackoverflow.com/questions/62603729/java-compiler-optimizations-with-final-local-variables)
 for more details. We could also make it as a class-level static final field, 
but keeping it here directly maps to the RocksDB code.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +79,24 @@ public void testPathExceptionOnWindows() throws Exception {
                }
        }
 
+       public void testSanityCheckArenaBlockSize() {

Review comment:
       ```suggestion
        @Test
        public void testSanityCheckArenaBlockSize() {
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
##########
@@ -91,10 +93,45 @@ public void testCreateSharedResourcesWithExpectedCapacity() 
{
                long totalMemorySize = 2048L;
                double writeBufferRatio = 0.5;
                double highPriPoolRatio = 0.1;
-               
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, 
writeBufferRatio, highPriPoolRatio);
+               RocksDBSharedResources rocksDBSharedResources = 
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, 
writeBufferRatio, highPriPoolRatio);
                long expectedCacheCapacity = 
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, 
writeBufferRatio);
                long expectedWbmCapacity = 
RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize,
 writeBufferRatio);
+
                assertThat(actualCacheCapacity.get(), 
is(expectedCacheCapacity));
                assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity));
+               
assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(), 
is(expectedWbmCapacity));
+       }
+
+       @Test
+       public void testCalculateRocksDBDefaultArenaBlockSize() {
+               long align = 4 * 1024;
+
+               long writeBufferSize = 64 * 1024 * 1024;
+
+               long expectArenaBlockSize = writeBufferSize / 8;
+               long expectArenaBlockSize2 = expectArenaBlockSize + align;
+               long expectArenaBlockSize3 = expectArenaBlockSize + 2 * align;
+
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize),
 is(expectArenaBlockSize));
+
+               // Alignment tests
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 + 7), is(expectArenaBlockSize));
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 + 8), is(expectArenaBlockSize2));
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 + 8 * align + 7), is(expectArenaBlockSize2));
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 + 8 * align + 8), is(expectArenaBlockSize3));

Review comment:
       ```suggestion
                final long align = 4 * 1024;
                final long writeBufferSize = 64 * 1024 * 1024;
                final long expectArenaBlockSize = writeBufferSize / 8;
   
                // Normal case test
                assertThat("Arena block size calculation error for normal case",
                        
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize),
 is(expectArenaBlockSize));
   
                // Alignment tests
                assertThat("Arena block size calculation error for alignment 
case",
                        
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 - 1), is(expectArenaBlockSize));
                assertThat("Arena block size calculation error for alignment 
case2",
                        
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize
 + 8), is(expectArenaBlockSize + align));
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
##########
@@ -95,4 +95,50 @@ static Cache createCache(long cacheCapacity, double 
highPriorityPoolRatio) {
        static WriteBufferManager createWriteBufferManager(long 
writeBufferManagerCapacity, Cache cache) {
                return new WriteBufferManager(writeBufferManagerCapacity, 
cache);
        }
+
+       /**
+        * Calculate the default arena block size as RocksDB calculates it in
+        * <a 
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196";>

Review comment:
       ```suggestion
         * <a 
href="https://github.com/ververica/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196-L201";>
   ```
   It's better to also include the source of alignment logic




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to