[ 
https://issues.apache.org/jira/browse/HADOOP-18521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17639972#comment-17639972
 ] 

ASF GitHub Bot commented on HADOOP-18521:
-----------------------------------------

anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033420242


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int 
length, TracingContext t
           throw new FileNotFoundException(ere.getMessage());
         }
       }
-      throw new IOException(ex);
+      throw ex;

Review Comment:
   Any specific reason for changing the exception type from IOException to 
AzureBlobFileSystemException ? 



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -851,4 +880,67 @@ private void resetReadBufferManager(int bufferSize, int 
threshold) {
     // by successive tests can lead to OOM based on the dev VM/machine 
capacity.
     System.gc();
   }
+
+  /**
+   * The first readahead closes the stream.
+   */
+  @Test
+  public void testStreamCloseInFirstReadAhead() throws Exception {
+    describe("close a stream during prefetch, verify outcome is good");
+
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName());
+    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+
+    final long initialInProgressBlocksDiscarded = 
bufferManager.getInProgressBlocksDiscarded();
+
+    // on first read, the op succeeds but the stream is closed, which
+    // means that the request should be considered a failure
+    doAnswer(invocation -> {
+      LOG.info("in read call with {}", inputStream);
+      inputStream.close();
+      return successOp;
+    }).doReturn(successOp)
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    // kick these off before the read() to avoid various race conditions.
+    queueReadAheads(inputStream);
+
+    // AbfsInputStream Read would have waited for the read-ahead for the 
requested offset
+    // as we are testing from ReadAheadManager directly, sleep for a sec to
+    // get the read ahead threads to complete
+    waitForPrefetchCompletion();
+
+    // this triggers prefetching, which closes the stream while the read
+    // is queued. which causes the prefetch to not return.
+    // which triggers a blocking read, which will then fail.
+    intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () 
-> {
+      // should fail
+      int bytesRead = inputStream.read(new byte[ONE_KB]);
+      // diagnostics info if failure wasn't raised
+      return "read " + bytesRead + " bytes from " + inputStream;
+    });
+
+    Assertions.assertThat(bufferManager.getCompletedReadListCopy())
+        .filteredOn(rb -> rb.getStream() == inputStream)
+        .describedAs("list of completed reads")
+        .isEmpty();
+    IOStatisticsStore ios = inputStream.getIOStatistics();
+    assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED)
+        .describedAs("blocks discarded by %s", inputStream)
+        .isGreaterThan(0);
+
+    // at least one of the blocks was discarded in progress.
+    // this is guaranteed because the mockito callback must have been invoked
+    // by the prefetcher
+    Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded())
+        .describedAs("in progress blocks discarded")
+        .isGreaterThan(initialInProgressBlocksDiscarded);
+  }
+
 }

Review Comment:
   nit: Add line at the end of the file.



##########
hadoop-tools/hadoop-azure/src/test/resources/log4j.properties:
##########
@@ -26,6 +26,8 @@ 
log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
 log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG
 
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE
 log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG
+log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE

Review Comment:
   Was this added for testing as this might add a lot of logging ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java:
##########
@@ -162,4 +182,120 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) 
{
     this.isAnyByteConsumed = isAnyByteConsumed;
   }
 
+  @Override
+  public String toString() {
+    return super.toString() +
+        "{ status=" + status +
+        ", offset=" + offset +
+        ", length=" + length +
+        ", requestedLength=" + requestedLength +
+        ", bufferindex=" + bufferindex +
+        ", timeStamp=" + timeStamp +
+        ", isFirstByteConsumed=" + isFirstByteConsumed +
+        ", isLastByteConsumed=" + isLastByteConsumed +
+        ", isAnyByteConsumed=" + isAnyByteConsumed +
+        ", errException=" + errException +
+        ", stream=" + (stream != null ? stream.getStreamID() : "none") +
+        ", stream closed=" + isStreamClosed() +
+        ", latch=" + latch +
+        '}';
+  }
+
+  /**
+   * Is the stream closed.
+   * @return stream closed status.
+   */
+  public boolean isStreamClosed() {
+    return stream != null && stream.isClosed();
+  }
+
+  /**
+   * IOStatistics of stream.
+   * @return the stream's IOStatisticsStore.
+   */
+  public IOStatisticsStore getStreamIOStatistics() {
+    return stream.getIOStatistics();
+  }
+
+  /**
+   * Start using the buffer.
+   * Sets the byte consumption flags as appriopriate, then
+   * updates the stream statistics with the use of this buffer.
+   * @param offset offset in buffer where copy began
+   * @param bytesCopied bytes copied.
+   */
+  void dataConsumedByStream(int offset, int bytesCopied) {
+    boolean isFirstUse = !isAnyByteConsumed;
+    setAnyByteConsumed(true);
+    if (offset == 0) {
+      setFirstByteConsumed(true);
+    }
+    if (offset + bytesCopied == getLength()) {
+      setLastByteConsumed(true);
+    }
+    IOStatisticsStore iostats = getStreamIOStatistics();
+    if (isFirstUse) {
+      // first use, update the use
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1);
+    }
+    // every use, update the count of bytes read
+    iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied);
+  }
+
+  /**
+   * The (completed) buffer was evicted; update stream statistics
+   * as appropriate.
+   */
+  void evicted() {
+    IOStatisticsStore iostats = getStreamIOStatistics();
+    iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1);
+    if (!isAnyByteConsumed()) {
+      // nothing was read, so consider it discarded.
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, 
getLength());
+    }
+  }
+
+  /**
+   * The (completed) buffer was discarded; no data was read.
+   */
+  void discarded() {
+    if (getBufferindex() >= 0) {
+      IOStatisticsStore iostats = getStreamIOStatistics();
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, 
getLength());
+    }
+  }
+
+  /**
+   * Release the buffer: update fields as appropriate.
+   */
+  void releaseBuffer() {
+    setBuffer(null);
+    setBufferindex(-1);
+  }
+
+

Review Comment:
   nit: Extra line





> ABFS ReadBufferManager buffer sharing across concurrent HTTP requests
> ---------------------------------------------------------------------
>
>                 Key: HADOOP-18521
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18521
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/azure
>    Affects Versions: 3.3.2, 3.3.3, 3.3.4
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Critical
>              Labels: pull-request-available
>
> AbfsInputStream.close() can trigger the return of buffers used for active 
> prefetch GET requests into the ReadBufferManager free buffer pool.
> A subsequent prefetch by a different stream in the same process may acquire 
> this same buffer. This can lead to risk of corruption of its own prefetched 
> data, data which may then be returned to that other thread.
> On releases without the fix for this (3.3.2+), the bug can be avoided by 
> disabling all prefetching 
> {code}
> fs.azure.readaheadqueue.depth = 0
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to