AlanConfluent commented on code in PR #21733: URL: https://github.com/apache/flink/pull/21733#discussion_r1084601474
########## flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java: ########## @@ -125,36 +129,45 @@ public void testBlockCache() throws Exception { 0)); // do some work and check the actual usage of memory - for (int i = 0; i < 10; i++) { + double[] deviations = new double[NUM_MEASUREMENTS]; + for (int i = 0; i < NUM_MEASUREMENTS; i++) { Thread.sleep(50L); - DoubleSummaryStatistics stats = + double[] blockCacheUsages = collectGaugeValues(jobIDs, "rocksdb.block-cache-usage") - .collect(Collectors.summarizingDouble((Double::doubleValue))); - assertEquals( - String.format( - "Block cache usage reported by different tasks varies too much: %s\n" - + "That likely mean that they use different cache objects", - stats), - stats.getMax(), - stats.getMin(), - // some deviation is possible because: - // 1. records are being processed in parallel with requesting metrics - // 2. reporting metrics is not synchronized - 500_000d); + .mapToDouble(value -> value) + .toArray(); assertTrue( String.format( "total block cache usage is too high: %s (limit: %s, effective limit: %s)", - stats, EXPECTED_BLOCK_CACHE_SIZE, EFFECTIVE_LIMIT), - stats.getMax() <= EFFECTIVE_LIMIT); + Arrays.toString(blockCacheUsages), + EXPECTED_BLOCK_CACHE_SIZE, + EFFECTIVE_LIMIT), + Arrays.stream(blockCacheUsages).max().getAsDouble() <= EFFECTIVE_LIMIT); + deviations[i] = new StandardDeviation().evaluate(blockCacheUsages); } - + validateDeviations(deviations); } finally { for (JobID jobID : jobIDs) { cluster.getRestClusterClient().cancel(jobID).get(); } } } + private static void validateDeviations(double[] deviations) { + DescriptiveStatisticsHistogramStatistics percentile = + new DescriptiveStatisticsHistogramStatistics(deviations); + assertTrue( + String.format( + "Block cache usage reported by different tasks varies too much: %s\n" + + "That likely mean that they use different cache objects", + Arrays.toString(deviations)), + // some deviation is possible because: + // 1. records are being processed in parallel with requesting metrics + // 2. reporting metrics is not synchronized + percentile.getQuantile(.50d) <= 10_000d Review Comment: These empirical values are hard to know how they're calibrated. It would be nice to know that this fails when the cache is not shared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org