This is an automated email from the ASF dual-hosted git repository.

mehakmeet pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 27cb5518216 HADOOP-18829. S3A prefetch LRU cache eviction metrics 
(#5893)
27cb5518216 is described below

commit 27cb55182168b00dcbc3613af3475a2327997d0a
Author: Viraj Jasani <vjas...@apache.org>
AuthorDate: Thu Sep 21 01:01:44 2023 -0800

    HADOOP-18829. S3A prefetch LRU cache eviction metrics (#5893)
    
    
    Contributed by: Viraj Jasani
---
 .../fs/impl/prefetch/CachingBlockManager.java      | 12 +++--
 .../impl/prefetch/EmptyPrefetchingStatistics.java  |  5 +++
 .../fs/impl/prefetch/PrefetchingStatistics.java    |  5 +++
 .../fs/impl/prefetch/SingleFilePerBlockCache.java  | 51 ++++++++++++++--------
 .../hadoop/fs/statistics/StreamStatisticNames.java | 12 +++++
 .../hadoop/fs/impl/prefetch/TestBlockCache.java    |  6 +--
 .../fs/statistics/IOStatisticAssertions.java       | 37 ++++++++++++++++
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   | 11 ++++-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   | 10 +++++
 .../fs/s3a/prefetch/S3ACachingBlockManager.java    |  3 +-
 .../statistics/impl/EmptyS3AStatisticsContext.java |  5 +++
 .../fs/s3a/ITestS3APrefetchingLruEviction.java     | 12 +++++
 .../hadoop/fs/s3a/prefetch/S3APrefetchFakes.java   |  5 ++-
 13 files changed, 145 insertions(+), 29 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
index 4461c118625..407cd630048 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -111,8 +112,10 @@ public abstract class CachingBlockManager extends 
BlockManager {
    * @param conf the configuration.
    * @param localDirAllocator the local dir allocator instance.
    * @param maxBlocksCount max blocks count to be kept in cache at any time.
+   * @param trackerFactory tracker with statistics to update.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
+  @SuppressWarnings("checkstyle:parameternumber")
   public CachingBlockManager(
       ExecutorServiceFuturePool futurePool,
       BlockData blockData,
@@ -120,7 +123,8 @@ public abstract class CachingBlockManager extends 
BlockManager {
       PrefetchingStatistics prefetchingStatistics,
       Configuration conf,
       LocalDirAllocator localDirAllocator,
-      int maxBlocksCount) {
+      int maxBlocksCount,
+      DurationTrackerFactory trackerFactory) {
     super(blockData);
 
     Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -136,7 +140,7 @@ public abstract class CachingBlockManager extends 
BlockManager {
     if (this.getBlockData().getFileSize() > 0) {
       this.bufferPool = new BufferPool(bufferPoolSize, 
this.getBlockData().getBlockSize(),
           this.prefetchingStatistics);
-      this.cache = this.createCache(maxBlocksCount);
+      this.cache = this.createCache(maxBlocksCount, trackerFactory);
     }
 
     this.ops = new BlockOperations();
@@ -559,8 +563,8 @@ public abstract class CachingBlockManager extends 
BlockManager {
     }
   }
 
-  protected BlockCache createCache(int maxBlocksCount) {
-    return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
+  protected BlockCache createCache(int maxBlocksCount, DurationTrackerFactory 
trackerFactory) {
+    return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount, 
trackerFactory);
   }
 
   protected void cachePut(int blockNumber, ByteBuffer buffer) throws 
IOException {
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java
index 177ff7abab8..d20a3272868 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java
@@ -57,6 +57,11 @@ public final class EmptyPrefetchingStatistics
 
   }
 
+  @Override
+  public void blockEvictedFromFileCache() {
+
+  }
+
   @Override
   public void prefetchOperationCompleted() {
 
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java
index 9ce2dec5889..d1ea600be72 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java
@@ -42,6 +42,11 @@ public interface PrefetchingStatistics extends 
IOStatisticsSource {
    */
   void blockRemovedFromFileCache();
 
+  /**
+   * A block has been evicted from the file cache.
+   */
+  void blockEvictedFromFileCache();
+
   /**
    * A prefetch operation has completed.
    */
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
index 6aacb4d7c84..ecf7e38f4f7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
@@ -47,10 +47,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
 
 /**
  * Provides functionality necessary for caching blocks of data read from 
FileSystem.
@@ -99,6 +103,11 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private final PrefetchingStatistics prefetchingStatistics;
 
+  /**
+   * Duration tracker factory required to track the duration of some 
operations.
+   */
+  private final DurationTrackerFactory trackerFactory;
+
   /**
    * File attributes attached to any intermediate temporary file created 
during index creation.
    */
@@ -209,14 +218,19 @@ public class SingleFilePerBlockCache implements 
BlockCache {
    *
    * @param prefetchingStatistics statistics for this stream.
    * @param maxBlocksCount max blocks count to be kept in cache at any time.
+   * @param trackerFactory tracker with statistics to update
    */
-  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, 
int maxBlocksCount) {
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
+      int maxBlocksCount,
+      DurationTrackerFactory trackerFactory) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
     this.closed = new AtomicBoolean(false);
     this.maxBlocksCount = maxBlocksCount;
     Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be 
more than 0");
     blocks = new ConcurrentHashMap<>();
     blocksLock = new ReentrantReadWriteLock();
+    this.trackerFactory = trackerFactory != null
+        ? trackerFactory : stubDurationTrackerFactory();
   }
 
   /**
@@ -430,25 +444,28 @@ public class SingleFilePerBlockCache implements 
BlockCache {
    * @param elementToPurge Block entry to evict.
    */
   private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
-    boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
-        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
-        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-    if (!lockAcquired) {
-      LOG.error("Cache file {} deletion would not be attempted as write lock 
could not"
-              + " be acquired within {} {}", elementToPurge.path,
+    try (DurationTracker ignored = 
trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) {
+      boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
           PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
           PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-    } else {
-      try {
-        if (Files.deleteIfExists(elementToPurge.path)) {
-          entryListSize--;
-          prefetchingStatistics.blockRemovedFromFileCache();
-          blocks.remove(elementToPurge.blockNumber);
+      if (!lockAcquired) {
+        LOG.error("Cache file {} deletion would not be attempted as write lock 
could not"
+                + " be acquired within {} {}", elementToPurge.path,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+      } else {
+        try {
+          if (Files.deleteIfExists(elementToPurge.path)) {
+            entryListSize--;
+            prefetchingStatistics.blockRemovedFromFileCache();
+            blocks.remove(elementToPurge.blockNumber);
+            prefetchingStatistics.blockEvictedFromFileCache();
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
+        } finally {
+          elementToPurge.releaseLock(Entry.LockType.WRITE);
         }
-      } catch (IOException e) {
-        LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
-      } finally {
-        elementToPurge.releaseLock(Entry.LockType.WRITE);
       }
     }
   }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 50bbf45505c..85b82287e37 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -455,6 +455,18 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
       = "stream_read_block_acquire_read";
 
+  /**
+   * Total number of blocks evicted from the disk cache.
+   */
+  public static final String STREAM_EVICT_BLOCKS_FROM_FILE_CACHE
+      = "stream_evict_blocks_from_cache";
+
+  /**
+   * Track duration of LRU cache eviction for disk cache.
+   */
+  public static final String STREAM_FILE_CACHE_EVICTION
+      = "stream_file_cache_eviction";
+
   private StreamStatisticNames() {
   }
 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
index b32ce20a373..a0c83a63c22 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
@@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   public void testArgChecks() throws Exception {
     // Should not throw.
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 
2);
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 
2, null);
 
     ByteBuffer buffer = ByteBuffer.allocate(16);
 
@@ -55,7 +55,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
 
     intercept(NullPointerException.class, null,
-        () -> new SingleFilePerBlockCache(null, 2));
+        () -> new SingleFilePerBlockCache(null, 2, null));
 
   }
 
@@ -63,7 +63,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   @Test
   public void testPutAndGet() throws Exception {
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 
2);
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 
2, null);
 
     ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
     for (byte i = 0; i < BUFFER_SIZE; i++) {
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
index 755599f0c39..8396cbd258d 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
@@ -176,6 +176,23 @@ public final class IOStatisticAssertions {
         verifyStatisticsNotNull(stats).counters(), value);
   }
 
+  /**
+   * Assert that two counters have similar values.
+   *
+   * @param stats statistics source.
+   * @param key1 statistic first key.
+   * @param key2 statistic second key.
+   */
+  public static void verifyStatisticCounterValues(
+      final IOStatistics stats,
+      final String key1,
+      final String key2) {
+    verifyStatisticValues(COUNTER,
+        key1,
+        key2,
+        verifyStatisticsNotNull(stats).counters());
+  }
+
   /**
    * Assert that a gauge has an expected value.
    * @param stats statistics source
@@ -258,6 +275,26 @@ public final class IOStatisticAssertions {
     return statistic;
   }
 
+  /**
+   * Assert that the given two statistics have same values.
+   *
+   * @param type type of the statistics.
+   * @param key1 statistic first key.
+   * @param key2 statistic second key.
+   * @param map map to look up.
+   * @param <E> type of map element.
+   */
+  private static <E> void verifyStatisticValues(
+      final String type,
+      final String key1,
+      final String key2,
+      final Map<String, E> map) {
+    final E statistic1 = lookupStatistic(type, key1, map);
+    final E statistic2 = lookupStatistic(type, key2, map);
+    assertThat(statistic1)
+        .describedAs("%s named %s and %s named %s", type, key1, type, key2)
+        .isEqualTo(statistic2);
+  }
 
   /**
    * Assert that a given statistic has an expected value.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 48977b51057..9d34457ab94 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -886,7 +886,8 @@ public class S3AInstrumentation implements Closeable, 
MetricsSource,
               StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
+              StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
+              StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
               STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
               STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -899,7 +900,8 @@ public class S3AInstrumentation implements Closeable, 
MetricsSource,
               StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
               StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
               StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
-              StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
+              StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ,
+              StreamStatisticNames.STREAM_FILE_CACHE_EVICTION)
           .build();
       setIOStatistics(st);
       aborted = st.getCounterReference(
@@ -1395,6 +1397,11 @@ public class S3AInstrumentation implements Closeable, 
MetricsSource,
       incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
     }
 
+    @Override
+    public void blockEvictedFromFileCache() {
+      increment(StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE);
+    }
+
     @Override
     public void prefetchOperationCompleted() {
       incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index ae761fe270f..b5f7fcbcd01 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -447,6 +447,16 @@ public enum Statistic {
       "Total queue duration of all block uploads",
       TYPE_DURATION),
 
+  /* Stream prefetch file cache eviction */
+  STREAM_EVICT_BLOCKS_FROM_FILE_CACHE(
+      StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+      "Count of blocks evicted from the disk cache",
+      TYPE_COUNTER),
+  STREAM_FILE_CACHE_EVICTION(
+      StreamStatisticNames.STREAM_FILE_CACHE_EVICTION,
+      "Duration of the eviction of an element from LRU cache that holds disk 
cache blocks",
+      TYPE_DURATION),
+
   /* committer stats */
   COMMITTER_COMMITS_CREATED(
       "committer_commits_created",
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
index a02922053aa..e008de3a79d 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
@@ -76,7 +76,8 @@ public class S3ACachingBlockManager extends 
CachingBlockManager {
         streamStatistics,
         conf,
         localDirAllocator,
-        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, 
DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
+        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, 
DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
+        streamStatistics);
 
     Validate.checkNotNull(reader, "reader");
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 6454065b240..e47656efd18 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -241,6 +241,11 @@ public final class EmptyS3AStatisticsContext implements 
S3AStatisticsContext {
 
     }
 
+    @Override
+    public void blockEvictedFromFileCache() {
+
+    }
+
     @Override
     public void executorAcquired(Duration timeInQueue) {
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
index a8211e24ce7..0fa08f37cf9 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
@@ -45,7 +45,11 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
 
 /**
@@ -172,6 +176,14 @@ public class ITestS3APrefetchingLruEviction extends 
AbstractS3ACostTest {
     LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
       LOG.info("IO stats: {}", ioStats);
       verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+      // stream_evict_blocks_from_cache is expected to be higher than 4, 
however we might face
+      // transient failures due to async prefetch get cancel issues. While 
TIMEOUT_MILLIS is
+      // sufficient wait time, consider re-running the test if 
stream_evict_blocks_from_cache
+      // value stays lower than 4.
+      assertThatStatisticCounter(ioStats,
+          STREAM_EVICT_BLOCKS_FROM_FILE_CACHE).isGreaterThanOrEqualTo(4);
+      verifyStatisticCounterValues(ioStats, 
STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+          STREAM_FILE_CACHE_EVICTION);
     });
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
index 3bf9965861f..89477a90719 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
@@ -59,6 +59,7 @@ import 
org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -308,7 +309,7 @@ public final class S3APrefetchFakes {
 
     public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
       super(new EmptyS3AStatisticsContext().newInputStreamStatistics(),
-          Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT);
+          Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT, null);
       this.files = new ConcurrentHashMap<>();
       this.readDelay = readDelay;
       this.writeDelay = writeDelay;
@@ -381,7 +382,7 @@ public final class S3APrefetchFakes {
     }
 
     @Override
-    protected BlockCache createCache(int maxBlocksCount) {
+    protected BlockCache createCache(int maxBlocksCount, 
DurationTrackerFactory trackerFactory) {
       final int readDelayMs = 50;
       final int writeDelayMs = 200;
       return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to