Yang Jie created SPARK-36598: -------------------------------- Summary: The Cache construct with weight eviction has risk of memory leakage Key: SPARK-36598 URL: https://issues.apache.org/jira/browse/SPARK-36598 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 3.3.0 Reporter: Yang Jie
In spark, we define Guava Cache in 3 places and use the weight eviction mechanism: # ExternalShuffleBlockResolver#shuffleIndexCache # RemoteBlockPushResolver#indexCache # SharedInMemoryCache#cache These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` >= 8589934592 (8g) because LocalCache weight eviction does not work when maxSegmentWeight is >= Int.MAX_VALUE( [https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).]) The UT that can be reproduced this issue is as follows: {code:java} @Test public void testShuffleIndexCacheEvictionBehavior() throws IOException, ExecutionException { Map<String, String> config = new HashMap<>(); String indexCacheSize = "8192m"; config.put("spark.shuffle.service.index.cache.size", indexCacheSize); TransportConf transportConf = new TransportConf("shuffle", new MapConfigProvider(config)); ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(transportConf, null); resolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = resolver.shuffleIndexCache; // 8g -> 8589934592 bytes long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize); int unitSize = 1048575; // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL int concurrencyLevel = 4; int totalGetCount = 16384; // maxCacheCount is 8192 long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * concurrencyLevel; for (int i = 0; i < totalGetCount; i++) { File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index"); ShuffleIndexInformation indexInfo = Mockito.mock(ShuffleIndexInformation.class); Mockito.when(indexInfo.getSize()).thenReturn(unitSize); shuffleIndexCache.get(indexFile, () -> indexInfo); } long totalWeight = shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum(); long size = shuffleIndexCache.size(); try{ Assert.assertTrue(size <= maxCacheCount); Assert.assertTrue(totalWeight < maximumWeight); fail("The tests code should not enter this line now."); } catch (AssertionError error) { // The code will enter this branch because LocalCache weight eviction does not work // when maxSegmentWeight is >= Int.MAX_VALUE. Assert.assertTrue(size > maxCacheCount && size <= totalGetCount); Assert.assertTrue(totalWeight > maximumWeight); } } {code} and from the debug view we found that there are 2 segment.totalWeight is a negative value !image-2021-08-26-15-51-40-532.png! -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org