AlanConfluent commented on code in PR #21733:
URL: https://github.com/apache/flink/pull/21733#discussion_r1084601474


##########
flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java:
##########
@@ -125,36 +129,45 @@ public void testBlockCache() throws Exception {
                                             0));
 
             // do some work and check the actual usage of memory
-            for (int i = 0; i < 10; i++) {
+            double[] deviations = new double[NUM_MEASUREMENTS];
+            for (int i = 0; i < NUM_MEASUREMENTS; i++) {
                 Thread.sleep(50L);
-                DoubleSummaryStatistics stats =
+                double[] blockCacheUsages =
                         collectGaugeValues(jobIDs, "rocksdb.block-cache-usage")
-                                
.collect(Collectors.summarizingDouble((Double::doubleValue)));
-                assertEquals(
-                        String.format(
-                                "Block cache usage reported by different tasks 
varies too much: %s\n"
-                                        + "That likely mean that they use 
different cache objects",
-                                stats),
-                        stats.getMax(),
-                        stats.getMin(),
-                        // some deviation is possible because:
-                        // 1. records are being processed in parallel with 
requesting metrics
-                        // 2. reporting metrics is not synchronized
-                        500_000d);
+                                .mapToDouble(value -> value)
+                                .toArray();
                 assertTrue(
                         String.format(
                                 "total block cache usage is too high: %s 
(limit: %s, effective limit: %s)",
-                                stats, EXPECTED_BLOCK_CACHE_SIZE, 
EFFECTIVE_LIMIT),
-                        stats.getMax() <= EFFECTIVE_LIMIT);
+                                Arrays.toString(blockCacheUsages),
+                                EXPECTED_BLOCK_CACHE_SIZE,
+                                EFFECTIVE_LIMIT),
+                        Arrays.stream(blockCacheUsages).max().getAsDouble() <= 
EFFECTIVE_LIMIT);
+                deviations[i] = new 
StandardDeviation().evaluate(blockCacheUsages);
             }
-
+            validateDeviations(deviations);
         } finally {
             for (JobID jobID : jobIDs) {
                 cluster.getRestClusterClient().cancel(jobID).get();
             }
         }
     }
 
+    private static void validateDeviations(double[] deviations) {
+        DescriptiveStatisticsHistogramStatistics percentile =
+                new DescriptiveStatisticsHistogramStatistics(deviations);
+        assertTrue(
+                String.format(
+                        "Block cache usage reported by different tasks varies 
too much: %s\n"
+                                + "That likely mean that they use different 
cache objects",
+                        Arrays.toString(deviations)),
+                // some deviation is possible because:
+                // 1. records are being processed in parallel with requesting 
metrics
+                // 2. reporting metrics is not synchronized
+                percentile.getQuantile(.50d) <= 10_000d

Review Comment:
   These empirical values are hard to know how they're calibrated.  It would be 
nice to know that this fails when the cache is not shared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to