anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033420242


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##########
@@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int 
length, TracingContext t
           throw new FileNotFoundException(ere.getMessage());
         }
       }
-      throw new IOException(ex);
+      throw ex;

Review Comment:
   Any specific reason for changing the exception type from IOException to 
AzureBlobFileSystemException ? 



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -851,4 +880,67 @@ private void resetReadBufferManager(int bufferSize, int 
threshold) {
     // by successive tests can lead to OOM based on the dev VM/machine 
capacity.
     System.gc();
   }
+
+  /**
+   * The first readahead closes the stream.
+   */
+  @Test
+  public void testStreamCloseInFirstReadAhead() throws Exception {
+    describe("close a stream during prefetch, verify outcome is good");
+
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName());
+    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+
+    final long initialInProgressBlocksDiscarded = 
bufferManager.getInProgressBlocksDiscarded();
+
+    // on first read, the op succeeds but the stream is closed, which
+    // means that the request should be considered a failure
+    doAnswer(invocation -> {
+      LOG.info("in read call with {}", inputStream);
+      inputStream.close();
+      return successOp;
+    }).doReturn(successOp)
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    // kick these off before the read() to avoid various race conditions.
+    queueReadAheads(inputStream);
+
+    // AbfsInputStream Read would have waited for the read-ahead for the 
requested offset
+    // as we are testing from ReadAheadManager directly, sleep for a sec to
+    // get the read ahead threads to complete
+    waitForPrefetchCompletion();
+
+    // this triggers prefetching, which closes the stream while the read
+    // is queued. which causes the prefetch to not return.
+    // which triggers a blocking read, which will then fail.
+    intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () 
-> {
+      // should fail
+      int bytesRead = inputStream.read(new byte[ONE_KB]);
+      // diagnostics info if failure wasn't raised
+      return "read " + bytesRead + " bytes from " + inputStream;
+    });
+
+    Assertions.assertThat(bufferManager.getCompletedReadListCopy())
+        .filteredOn(rb -> rb.getStream() == inputStream)
+        .describedAs("list of completed reads")
+        .isEmpty();
+    IOStatisticsStore ios = inputStream.getIOStatistics();
+    assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED)
+        .describedAs("blocks discarded by %s", inputStream)
+        .isGreaterThan(0);
+
+    // at least one of the blocks was discarded in progress.
+    // this is guaranteed because the mockito callback must have been invoked
+    // by the prefetcher
+    Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded())
+        .describedAs("in progress blocks discarded")
+        .isGreaterThan(initialInProgressBlocksDiscarded);
+  }
+
 }

Review Comment:
   nit: Add line at the end of the file.



##########
hadoop-tools/hadoop-azure/src/test/resources/log4j.properties:
##########
@@ -26,6 +26,8 @@ 
log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
 log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG
 
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE
 log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG
+log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE

Review Comment:
   Was this added for testing as this might add a lot of logging ?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java:
##########
@@ -162,4 +182,120 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) 
{
     this.isAnyByteConsumed = isAnyByteConsumed;
   }
 
+  @Override
+  public String toString() {
+    return super.toString() +
+        "{ status=" + status +
+        ", offset=" + offset +
+        ", length=" + length +
+        ", requestedLength=" + requestedLength +
+        ", bufferindex=" + bufferindex +
+        ", timeStamp=" + timeStamp +
+        ", isFirstByteConsumed=" + isFirstByteConsumed +
+        ", isLastByteConsumed=" + isLastByteConsumed +
+        ", isAnyByteConsumed=" + isAnyByteConsumed +
+        ", errException=" + errException +
+        ", stream=" + (stream != null ? stream.getStreamID() : "none") +
+        ", stream closed=" + isStreamClosed() +
+        ", latch=" + latch +
+        '}';
+  }
+
+  /**
+   * Is the stream closed.
+   * @return stream closed status.
+   */
+  public boolean isStreamClosed() {
+    return stream != null && stream.isClosed();
+  }
+
+  /**
+   * IOStatistics of stream.
+   * @return the stream's IOStatisticsStore.
+   */
+  public IOStatisticsStore getStreamIOStatistics() {
+    return stream.getIOStatistics();
+  }
+
+  /**
+   * Start using the buffer.
+   * Sets the byte consumption flags as appriopriate, then
+   * updates the stream statistics with the use of this buffer.
+   * @param offset offset in buffer where copy began
+   * @param bytesCopied bytes copied.
+   */
+  void dataConsumedByStream(int offset, int bytesCopied) {
+    boolean isFirstUse = !isAnyByteConsumed;
+    setAnyByteConsumed(true);
+    if (offset == 0) {
+      setFirstByteConsumed(true);
+    }
+    if (offset + bytesCopied == getLength()) {
+      setLastByteConsumed(true);
+    }
+    IOStatisticsStore iostats = getStreamIOStatistics();
+    if (isFirstUse) {
+      // first use, update the use
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_USED, 1);
+    }
+    // every use, update the count of bytes read
+    iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_USED, bytesCopied);
+  }
+
+  /**
+   * The (completed) buffer was evicted; update stream statistics
+   * as appropriate.
+   */
+  void evicted() {
+    IOStatisticsStore iostats = getStreamIOStatistics();
+    iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_EVICTED, 1);
+    if (!isAnyByteConsumed()) {
+      // nothing was read, so consider it discarded.
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, 
getLength());
+    }
+  }
+
+  /**
+   * The (completed) buffer was discarded; no data was read.
+   */
+  void discarded() {
+    if (getBufferindex() >= 0) {
+      IOStatisticsStore iostats = getStreamIOStatistics();
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BLOCKS_DISCARDED, 1);
+      iostats.incrementCounter(STREAM_READ_PREFETCH_BYTES_DISCARDED, 
getLength());
+    }
+  }
+
+  /**
+   * Release the buffer: update fields as appropriate.
+   */
+  void releaseBuffer() {
+    setBuffer(null);
+    setBufferindex(-1);
+  }
+
+

Review Comment:
   nit: Extra line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to