[ 
https://issues.apache.org/jira/browse/HADOOP-18190?focusedWorklogId=783270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783270
 ]

ASF GitHub Bot logged work on HADOOP-18190:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/22 09:15
            Start Date: 21/Jun/22 09:15
    Worklog Time Spent: 10m 
      Work Description: monthonk commented on code in PR #4458:
URL: https://github.com/apache/hadoop/pull/4458#discussion_r902344956


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java:
##########
@@ -92,9 +100,11 @@ public CachingBlockManager(
     this.numCachingErrors = new AtomicInteger();
     this.numReadErrors = new AtomicInteger();
     this.cachingDisabled = new AtomicBoolean();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   We use `Validate.checkNotNull` for futurePool why use other function for 
prefetchingStatistics? also add exception info the comment



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java:
##########
@@ -81,7 +85,13 @@ public String toString() {
     }
   }
 
-  public SingleFilePerBlockCache() {
+  /**
+   * Constructs an instance of a {@code SingleFilePerBlockCache}.
+   *
+   * @param prefetchingStatistics statistics for this stream.
+   */
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   see if we want to change to `Validate.checkNotNull` and add possible 
exception info



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java:
##########
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
   // Allows associating metadata to each buffer in the pool.
   private Map<BufferData, ByteBuffer> allocated;
 
+  private PrefetchingStatistics prefetchingStatistics;
+
   /**
    * Initializes a new instance of the {@code BufferPool} class.
    *
    * @param size number of buffer in this pool.
    * @param bufferSize size in bytes of each buffer.
+   * @param prefetchingStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if size is zero or negative.
    * @throws IllegalArgumentException if bufferSize is zero or negative.
    */
-  public BufferPool(int size, int bufferSize) {
+  public BufferPool(int size, int bufferSize, PrefetchingStatistics 
prefetchingStatistics) {
     Validate.checkPositiveInteger(size, "size");
     Validate.checkPositiveInteger(bufferSize, "bufferSize");
 
     this.size = size;
     this.bufferSize = bufferSize;
     this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);

Review Comment:
   what exception would be thrown if it is null? and please add it in the 
comment



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java:
##########
@@ -1281,6 +1313,38 @@ public DurationTracker initiateInnerStreamClose(final 
boolean abort) {
           ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
           : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
     }
+
+    @Override
+    public void prefetchOperationStarted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
+      prefetchReadOperations.incrementAndGet();
+    }
+
+    @Override
+    public void blockAddedToFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1);
+    }
+
+    @Override
+    public void blockRemovedFromFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
+    }
+
+    @Override
+    public void prefetchOperationCompleted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
+    }
+
+

Review Comment:
   nit: delete extra new line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
 
     }
 
+
+    @Override
+    public void prefetchOperationStarted() {
+
+    }
+
+

Review Comment:
   nit: delete extra new line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java:
##########
@@ -117,8 +120,8 @@ public void seek(long pos) throws IOException {
 
   @Override
   public void close() throws IOException {
-    super.close();
     this.blockManager.close();
+    super.close();

Review Comment:
   why move it here? please add some comments if it is something others should 
be aware of.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -674,6 +674,46 @@ private Constants() {
   public static final String STREAM_READ_GAUGE_INPUT_POLICY =
       "stream_read_gauge_input_policy";
 
+  /**
+   * Total number of prefetching operations executed.
+   */
+  public static final String STREAM_READ_PREFETCH_OPERATIONS

Review Comment:
   I think they should go to common package since `PrefetchingStatistics` is 
already there. maybe create a new class for these constants.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
 
     }
 
+

Review Comment:
   nit: delete extra new line





Issue Time Tracking
-------------------

    Worklog Id:     (was: 783270)
    Time Spent: 20m  (was: 10m)

> s3a prefetching streams to collect iostats on prefetching operations
> --------------------------------------------------------------------
>
>                 Key: HADOOP-18190
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18190
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> There is a lot more happening in reads, so there's a lot more data to collect 
> and publish in IO stats for us to view in a summary at the end of processes 
> as well as get from the stream while it is active.
> Some useful ones would seem to be:
> counters
>  * is in memory. using 0 or 1 here lets aggregation reports count total #of 
> memory cached files.
>  * prefetching operations executed
>  * errors during prefetching
> gauges
>  * number of blocks in cache
>  * total size of blocks
>  * active prefetches
> + active memory used
> duration tracking count/min/max/ave
>  * time to fetch a block
>  * time queued before the actual fetch begins
>  * time a reader is blocked waiting for a block fetch to complete
> and some info on cache use itself
>  * number of blocks discarded unread
>  * number of prefetched blocks later used
>  * number of backward seeks to a prefetched block
>  * number of forward seeks to a prefetched block
> the key ones I care about are
>  # memory consumption
>  # can we determine if cache is working (reads with cache hit) and when it is 
> not (misses, wasted prefetches)
>  # time blocked on executors
> The stats need to be accessible on a stream even when closed, and aggregated 
> into the FS. once we get per-thread stats contexts we can publish there too 
> and collect in worker threads for reporting in task commits



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to