carp84 commented on a change in pull request #13688:
URL: https://github.com/apache/flink/pull/13688#discussion_r511004271



##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
##########
@@ -156,9 +160,49 @@ public static ColumnFamilyDescriptor 
createColumnFamilyDescriptor(
                
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, 
nameBytes),
                        "The chosen state name 'default' collides with the name 
of the default column family!");
 
+               if (writeBufferManagerCapacity != null) {
+                       // It'd be great to perform the check earlier, e.g. 
when creating write buffer manager.
+                       // Unfortunately the check needs write buffer size that 
was just calculated.
+                       sanityCheckArenaBlockSize(options.writeBufferSize(), 
options.arenaBlockSize(), writeBufferManagerCapacity);
+               }
+
                return new ColumnFamilyDescriptor(nameBytes, options);
        }
 
+       /**
+        * Logs a warning ff the arena block size is too high causing RocksDB 
to flush constantly.
+        * Essentially, the condition here
+        * <a 
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47"/>
+        * will always be true.

Review comment:
       ```suggestion
         * Essentially, the condition
         * <a 
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/include/rocksdb/write_buffer_manager.h#L47";>
         * here</a> will always be true.
   ```

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
##########
@@ -91,10 +93,33 @@ public void testCreateSharedResourcesWithExpectedCapacity() 
{
                long totalMemorySize = 2048L;
                double writeBufferRatio = 0.5;
                double highPriPoolRatio = 0.1;
-               
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, 
writeBufferRatio, highPriPoolRatio);
+               RocksDBSharedResources rocksDBSharedResources = 
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, 
writeBufferRatio, highPriPoolRatio);
                long expectedCacheCapacity = 
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, 
writeBufferRatio);
                long expectedWbmCapacity = 
RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize,
 writeBufferRatio);
+
                assertThat(actualCacheCapacity.get(), 
is(expectedCacheCapacity));
                assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity));
+               
assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(), 
is(expectedWbmCapacity));
+       }
+
+       @Test
+       public void testCalculateRocksDBDefaultArenaBlockSize() {
+               long writeBufferSize = 64 * 1024 * 1024;
+               long expectArenaBlockSize = writeBufferSize / 8;
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize),
 is(expectArenaBlockSize));
+       }
+
+       @Test
+       public void testCalculateRocksDBMutableLimit() {
+               long bufferSize = 64 * 1024 * 1024;
+               long limit = bufferSize * 7 / 8;
+               
assertThat(RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(bufferSize),
 is(limit));
+       }
+
+       @Test
+       public void testValidateArenaBlockSize() {
+               long arenaBlockSize = 8 * 1024 * 1024;
+               
assertFalse(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize, 
(long) (arenaBlockSize * 0.5)));
+               
assertTrue(RocksDBMemoryControllerUtils.validateArenaBlockSize(arenaBlockSize, 
(long) (arenaBlockSize * 1.5)));
        }

Review comment:
       I'm hesitating on adding these tests since they're testing against the 
implementation instead of any contract. Once the calculation formula changes, 
these tests will fail and need to be adjusted accordingly.

##########
File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOperationsUtilsTest.java
##########
@@ -78,6 +81,60 @@ public void testPathExceptionOnWindows() throws Exception {
                }
        }
 
+       @Test
+       public void testSanityCheckArenaBlockSize() {
+               long testWriteBufferSize = 56 * 1024 * 1024L;
+               long testDefaultArenaSize = testWriteBufferSize / 8;
+               long testValidArenaSize = testWriteBufferSize / 7;
+               long testInvalidArenaSize = testWriteBufferSize / 7 - 8L;
+               List<TestData> tests = Arrays.asList(
+                               new TestData(testWriteBufferSize, 0, 
testInvalidArenaSize, false),
+                               new TestData(testWriteBufferSize, 
testDefaultArenaSize, testInvalidArenaSize, false),
+                               new TestData(testWriteBufferSize, 0, 
testValidArenaSize, true),
+                               new TestData(testWriteBufferSize, 
testDefaultArenaSize, testValidArenaSize, true)
+               );
+
+               for (TestData test : tests) {
+                       long writeBufferSize = test.getWriteBufferSize();
+                       long arenaBlockSizeConfigured = 
test.getArenaBlockSizeConfigured();
+                       long writeBufferManagerCapacity = 
test.getWriteBufferManagerCapacity();
+                       boolean expected = test.getExpectedResult();
+
+                       boolean sanityCheckResult = 
RocksDBOperationUtils.sanityCheckArenaBlockSize(writeBufferSize, 
arenaBlockSizeConfigured, writeBufferManagerCapacity);
+                       assertThat(sanityCheckResult, is(expected));
+               }
+       }
+
+       private static class TestData {
+               private final long writeBufferSize;
+               private final long arenaBlockSizeConfigured;
+               private final long writeBufferManagerCapacity;
+               private final boolean expectedResult;
+
+               public TestData(long writeBufferSize, long 
arenaBlockSizeConfigured, long writeBufferManagerCapacity, boolean 
expectedResult) {
+                       this.writeBufferSize = writeBufferSize;
+                       this.arenaBlockSizeConfigured = 
arenaBlockSizeConfigured;
+                       this.writeBufferManagerCapacity = 
writeBufferManagerCapacity;
+                       this.expectedResult = expectedResult;
+               }
+
+               public long getWriteBufferSize() {
+                       return writeBufferSize;
+               }
+
+               public long getArenaBlockSizeConfigured() {
+                       return arenaBlockSizeConfigured;
+               }
+
+               public long getWriteBufferManagerCapacity() {
+                       return writeBufferManagerCapacity;
+               }
+
+               public boolean getExpectedResult() {
+                       return expectedResult;
+               }
+       }

Review comment:
       ```suggestion
        public void testSanityCheckArenaBlockSize() {
                long testWriteBufferSize = 56 * 1024 * 1024L;
                long testDefaultArenaSize = 
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(testWriteBufferSize);
                long testWriteBufferCapacityBoundary = testDefaultArenaSize * 8 
/ 7;
                assertThat("The sanity check result is incorrect with default 
arena block size",
                        
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, 0, 
testWriteBufferCapacityBoundary),
                        is(true));
                assertThat("The sanity check should pass when the configured 
arena block size is small enough.",
                        
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, 
testDefaultArenaSize - 1, testWriteBufferCapacityBoundary),
                        is(true));
                assertThat("The sanity check should fail when the configured 
arena block size is too big.",
                        
RocksDBOperationUtils.sanityCheckArenaBlockSize(testWriteBufferSize, 
testDefaultArenaSize + 1, testWriteBufferCapacityBoundary),
                        is(false));
        }
   ```
   Sorry but I'm still not satisfied with this test and suggest to further 
simplify it and adding some hints if any of the test fails.
   
   And although this test is also implementation bounded, it checks/guards the 
result of multiple calculations, so I think we should keep it.
   
   We will also need to remove useless imports if the suggestion is accepted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to