pranavsaxena-microsoft commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1017458506


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -534,17 +676,31 @@ void callTryEvict() {
     tryEvict();
   }
 
-
   /**
    * Purging the buffers associated with an {@link AbfsInputStream}
    * from {@link ReadBufferManager} when stream is closed.
+   * Before HADOOP-18521 this would purge in progress reads, which
+   * would return the active buffer to the free pool while it was
+   * still in use.
    * @param stream input stream.
    */
   public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
     LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+
+    // remove from the queue
+    int before = readAheadQueue.size();
     readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
-    purgeList(stream, completedReadList);
-    purgeList(stream, inProgressList);
+    int readaheadPurged = readAheadQueue.size() - before;

Review Comment:
   By the thread reaches this line, maybe some more blocks would be added in 
readAheadQueue, this may bloat the metric. Also, before should >= 
readAheadQueue.size() (in case no additional blocks are ahead), this would 
result in negative addition.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws 
InterruptedException {
    */
   void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
     if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-    }
-    synchronized (this) {
-      // If this buffer has already been purged during
-      // close of InputStream then we don't update the lists.
-      if (inProgressList.contains(buffer)) {
-        inProgressList.remove(buffer);
+      LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), 
bytesActuallyRead);
+    }
+    try {
+      synchronized (this) {
+        checkState(inProgressList.remove(buffer),
+            "Read completed from an operation not declared as in progress %s", 
buffer);
+        // If this buffer has already been purged during
+        // close of InputStream then we don't update the lists.
         if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
           buffer.setStatus(ReadBufferStatus.AVAILABLE);
           buffer.setLength(bytesActuallyRead);
         } else {
-          freeList.push(buffer.getBufferindex());
-          // buffer will be deleted as per the eviction policy.
+          // there is no data, so it is immediately returned to the free list.
+          placeBufferOnFreeList("failed read", buffer);

Review Comment:
   This may result in IllegalStateException propogating to AbfsInputStream.
   
   This line will add the buffer into freeList, from which this index shall be 
taken by readBuffer b1.
   Now, after sometime, let this buffer from completedList needs to be evicted, 
it would come to 
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L408,
 two things can happen:
   
   1. freeList still has this index: it will throw IllegalStateException
   2. freeList doesn't have: it will throw IllegalStateException from 
https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L411.
 



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws 
InterruptedException {
    */
   void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
     if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-    }
-    synchronized (this) {
-      // If this buffer has already been purged during
-      // close of InputStream then we don't update the lists.
-      if (inProgressList.contains(buffer)) {
-        inProgressList.remove(buffer);
+      LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), 
bytesActuallyRead);
+    }
+    try {
+      synchronized (this) {
+        checkState(inProgressList.remove(buffer),
+            "Read completed from an operation not declared as in progress %s", 
buffer);
+        // If this buffer has already been purged during
+        // close of InputStream then we don't update the lists.
         if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
           buffer.setStatus(ReadBufferStatus.AVAILABLE);
           buffer.setLength(bytesActuallyRead);
         } else {
-          freeList.push(buffer.getBufferindex());
-          // buffer will be deleted as per the eviction policy.
+          // there is no data, so it is immediately returned to the free list.
+          placeBufferOnFreeList("failed read", buffer);

Review Comment:
   Made a suggestive-change, which prevents this:
   
https://github.com/pranavsaxena-microsoft/hadoop/commit/0d09a0de501bdc928139263075f82feb064fd6bc



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws 
InterruptedException {
    */
   void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
     if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-    }
-    synchronized (this) {
-      // If this buffer has already been purged during
-      // close of InputStream then we don't update the lists.
-      if (inProgressList.contains(buffer)) {
-        inProgressList.remove(buffer);
+      LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), 
bytesActuallyRead);
+    }
+    try {
+      synchronized (this) {
+        checkState(inProgressList.remove(buffer),
+            "Read completed from an operation not declared as in progress %s", 
buffer);
+        // If this buffer has already been purged during
+        // close of InputStream then we don't update the lists.
         if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
           buffer.setStatus(ReadBufferStatus.AVAILABLE);
           buffer.setLength(bytesActuallyRead);
         } else {
-          freeList.push(buffer.getBufferindex());
-          // buffer will be deleted as per the eviction policy.
+          // there is no data, so it is immediately returned to the free list.
+          placeBufferOnFreeList("failed read", buffer);

Review Comment:
   Test for the same: 
https://github.com/pranavsaxena-microsoft/hadoop/commit/18da3752f3f72a953cecba0525a01bfab6be89ee.
   
   In seperate run:
   ```
   java.lang.IllegalStateException: Buffer 14 returned to free buffer list by 
non-owner ReadBuffer{status=AVAILABLE, offset=4194304, length=0, 
requestedLength=4194304, bufferindex=14, timeStamp=46807492, 
isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, 
errException=org.apache.hadoop.fs.PathIOException: `/testfilef6b6f93ac245': 
Input/output error: Buffer index 14 found in buffer collection 
completedReadList, 
stream=org.apache.hadoop.fs.azurebfs.services.AbfsInputStream@652e2419{counters=((stream_read_bytes_backwards_on_seek=0)
 (stream_read_operations=1) (remote_read_op=2) 
(stream_read_seek_backward_operations=0) (action_http_get_request.failures=0) 
(action_http_get_request=0) (bytes_read_buffer=0) (stream_read_bytes=0) 
(seek_in_buffer=0) (remote_bytes_read=0) (stream_read_seek_bytes_skipped=0) 
(stream_read_seek_operations=2) (read_ahead_bytes_read=0) 
(stream_read_seek_forward_operations=2));
   gauges=();
   minimums=((action_http_get_request.failures.min=-1) 
(action_http_get_request.min=-1));
   maximums=((action_http_get_request.max=-1) 
(action_http_get_request.failures.max=-1));
   means=((action_http_get_request.failures.mean=(samples=0, sum=0, 
mean=0.0000)) (action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)));
   
}AbfsInputStream@(1697522713){StreamStatistics{counters=((remote_bytes_read=0) 
(stream_read_seek_backward_operations=0) (remote_read_op=2) 
(stream_read_seek_forward_operations=2) (bytes_read_buffer=0) 
(seek_in_buffer=0) (stream_read_bytes=0) (stream_read_operations=1) 
(read_ahead_bytes_read=0) (stream_read_bytes_backwards_on_seek=0) 
(stream_read_seek_operations=2) (action_http_get_request.failures=0) 
(stream_read_seek_bytes_skipped=0) (action_http_get_request=0));
   gauges=();
   minimums=((action_http_get_request.min=-1) 
(action_http_get_request.failures.min=-1));
   maximums=((action_http_get_request.failures.max=-1) 
(action_http_get_request.max=-1));
   means=((action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)) 
(action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000)));
   }}}
   
        at 
org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298)
        at 
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyReadOwnsBufferAtIndex(ReadBufferManager.java:430)
        at 
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:411)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to