Yang Jie created SPARK-36598:
--------------------------------

             Summary: The Cache construct with weight eviction has risk of 
memory leakage
                 Key: SPARK-36598
                 URL: https://issues.apache.org/jira/browse/SPARK-36598
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, SQL
    Affects Versions: 3.3.0
            Reporter: Yang Jie


In spark, we define Guava Cache in 3 places and use the weight eviction 
mechanism:
 # ExternalShuffleBlockResolver#shuffleIndexCache
 # RemoteBlockPushResolver#indexCache
 # SharedInMemoryCache#cache

These 3 Guava Cache has risk of memory leakage if configured `maximumWeight` >= 
8589934592 (8g) because LocalCache weight eviction does not work when 
maxSegmentWeight is >= Int.MAX_VALUE( 
[https://github.com/google/guava/issues/1761|https://github.com/google/guava/issues/1761).])

The UT that can be reproduced this issue is as follows:
{code:java}
@Test
public void testShuffleIndexCacheEvictionBehavior() throws IOException, 
ExecutionException {
  Map<String, String> config = new HashMap<>();
  String indexCacheSize = "8192m";
  config.put("spark.shuffle.service.index.cache.size", indexCacheSize);
  TransportConf transportConf = new TransportConf("shuffle", new 
MapConfigProvider(config));
  ExternalShuffleBlockResolver resolver = new 
ExternalShuffleBlockResolver(transportConf, null);
  resolver.registerExecutor("app0", "exec0", 
dataContext.createExecutorInfo(SORT_MANAGER));

  LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache = 
resolver.shuffleIndexCache;

  // 8g -> 8589934592 bytes
  long maximumWeight = JavaUtils.byteStringAsBytes(indexCacheSize);
  int unitSize = 1048575;
  // CacheBuilder.DEFAULT_CONCURRENCY_LEVEL
  int concurrencyLevel = 4;
  int totalGetCount = 16384;
  // maxCacheCount is 8192
  long maxCacheCount = maximumWeight / concurrencyLevel / unitSize * 
concurrencyLevel;
  for (int i = 0; i < totalGetCount; i++) {
    File indexFile = new File("shuffle_" + 0 + "_" + i + "_0.index");
    ShuffleIndexInformation indexInfo = 
Mockito.mock(ShuffleIndexInformation.class);
    Mockito.when(indexInfo.getSize()).thenReturn(unitSize);
    shuffleIndexCache.get(indexFile, () -> indexInfo);
  }

  long totalWeight =
    
shuffleIndexCache.asMap().values().stream().mapToLong(ShuffleIndexInformation::getSize).sum();
  long size = shuffleIndexCache.size();
  try{
    Assert.assertTrue(size <= maxCacheCount);
    Assert.assertTrue(totalWeight < maximumWeight);
    fail("The tests code should not enter this line now.");
  } catch (AssertionError error) {
    // The code will enter this branch because LocalCache weight eviction does 
not work
    // when maxSegmentWeight is >= Int.MAX_VALUE.
    Assert.assertTrue(size > maxCacheCount && size <= totalGetCount);
    Assert.assertTrue(totalWeight > maximumWeight);
  }
}
{code}
 

and from the debug view  we found that there are 2 segment.totalWeight is a 
negative value

!image-2021-08-26-15-51-40-532.png!

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to