[
https://issues.apache.org/jira/browse/HADOOP-18190?focusedWorklogId=783270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783270
]
ASF GitHub Bot logged work on HADOOP-18190:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jun/22 09:15
Start Date: 21/Jun/22 09:15
Worklog Time Spent: 10m
Work Description: monthonk commented on code in PR #4458:
URL: https://github.com/apache/hadoop/pull/4458#discussion_r902344956
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java:
##########
@@ -92,9 +100,11 @@ public CachingBlockManager(
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
Review Comment:
We use `Validate.checkNotNull` for futurePool why use other function for
prefetchingStatistics? also add exception info the comment
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java:
##########
@@ -81,7 +85,13 @@ public String toString() {
}
}
- public SingleFilePerBlockCache() {
+ /**
+ * Constructs an instance of a {@code SingleFilePerBlockCache}.
+ *
+ * @param prefetchingStatistics statistics for this stream.
+ */
+ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
Review Comment:
see if we want to change to `Validate.checkNotNull` and add possible
exception info
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java:
##########
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
// Allows associating metadata to each buffer in the pool.
private Map<BufferData, ByteBuffer> allocated;
+ private PrefetchingStatistics prefetchingStatistics;
+
/**
* Initializes a new instance of the {@code BufferPool} class.
*
* @param size number of buffer in this pool.
* @param bufferSize size in bytes of each buffer.
+ * @param prefetchingStatistics statistics for this stream.
*
* @throws IllegalArgumentException if size is zero or negative.
* @throws IllegalArgumentException if bufferSize is zero or negative.
*/
- public BufferPool(int size, int bufferSize) {
+ public BufferPool(int size, int bufferSize, PrefetchingStatistics
prefetchingStatistics) {
Validate.checkPositiveInteger(size, "size");
Validate.checkPositiveInteger(bufferSize, "bufferSize");
this.size = size;
this.bufferSize = bufferSize;
this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
Review Comment:
what exception would be thrown if it is null? and please add it in the
comment
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java:
##########
@@ -1281,6 +1313,38 @@ public DurationTracker initiateInnerStreamClose(final
boolean abort) {
? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
: StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
}
+
+ @Override
+ public void prefetchOperationStarted() {
+ incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
+ prefetchReadOperations.incrementAndGet();
+ }
+
+ @Override
+ public void blockAddedToFileCache() {
+ incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1);
+ }
+
+ @Override
+ public void blockRemovedFromFileCache() {
+ incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
+ }
+
+ @Override
+ public void prefetchOperationCompleted() {
+ incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
+ }
+
+
Review Comment:
nit: delete extra new line
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
}
+
+ @Override
+ public void prefetchOperationStarted() {
+
+ }
+
+
Review Comment:
nit: delete extra new line
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java:
##########
@@ -117,8 +120,8 @@ public void seek(long pos) throws IOException {
@Override
public void close() throws IOException {
- super.close();
this.blockManager.close();
+ super.close();
Review Comment:
why move it here? please add some comments if it is something others should
be aware of.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -674,6 +674,46 @@ private Constants() {
public static final String STREAM_READ_GAUGE_INPUT_POLICY =
"stream_read_gauge_input_policy";
+ /**
+ * Total number of prefetching operations executed.
+ */
+ public static final String STREAM_READ_PREFETCH_OPERATIONS
Review Comment:
I think they should go to common package since `PrefetchingStatistics` is
already there. maybe create a new class for these constants.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java:
##########
@@ -208,6 +208,43 @@ public void unbuffered() {
}
+
Review Comment:
nit: delete extra new line
Issue Time Tracking
-------------------
Worklog Id: (was: 783270)
Time Spent: 20m (was: 10m)
> s3a prefetching streams to collect iostats on prefetching operations
> --------------------------------------------------------------------
>
> Key: HADOOP-18190
> URL: https://issues.apache.org/jira/browse/HADOOP-18190
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Assignee: Ahmar Suhail
> Priority: Major
> Labels: pull-request-available
> Time Spent: 20m
> Remaining Estimate: 0h
>
> There is a lot more happening in reads, so there's a lot more data to collect
> and publish in IO stats for us to view in a summary at the end of processes
> as well as get from the stream while it is active.
> Some useful ones would seem to be:
> counters
> * is in memory. using 0 or 1 here lets aggregation reports count total #of
> memory cached files.
> * prefetching operations executed
> * errors during prefetching
> gauges
> * number of blocks in cache
> * total size of blocks
> * active prefetches
> + active memory used
> duration tracking count/min/max/ave
> * time to fetch a block
> * time queued before the actual fetch begins
> * time a reader is blocked waiting for a block fetch to complete
> and some info on cache use itself
> * number of blocks discarded unread
> * number of prefetched blocks later used
> * number of backward seeks to a prefetched block
> * number of forward seeks to a prefetched block
> the key ones I care about are
> # memory consumption
> # can we determine if cache is working (reads with cache hit) and when it is
> not (misses, wasted prefetches)
> # time blocked on executors
> The stats need to be accessible on a stream even when closed, and aggregated
> into the FS. once we get per-thread stats contexts we can publish there too
> and collect in worker threads for reporting in task commits
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]