[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-28 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033433849


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##
@@ -302,33 +325,33 @@ private synchronized boolean tryEvict() {
   }
 
   private boolean evict(final ReadBuffer buf) {
-// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
-// avoid adding it to freeList.
-if (buf.getBufferindex() != -1) {
-  freeList.push(buf.getBufferindex());
-}
-
-completedReadList.remove(buf);
 buf.setTracingContext(null);
 if (LOGGER.isTraceEnabled()) {
   LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
   buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
 }
+completedReadList.remove(buf);

Review Comment:
   Can you please highlight again why should we not remove the buffer from the 
completed list after it has been added to the free list ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-28 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033420242


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java:
##
@@ -555,7 +579,7 @@ int readRemote(long position, byte[] b, int offset, int 
length, TracingContext t
   throw new FileNotFoundException(ere.getMessage());
 }
   }
-  throw new IOException(ex);
+  throw ex;

Review Comment:
   Any specific reason for changing the exception type from IOException to 
AzureBlobFileSystemException ? 



##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -851,4 +880,67 @@ private void resetReadBufferManager(int bufferSize, int 
threshold) {
 // by successive tests can lead to OOM based on the dev VM/machine 
capacity.
 System.gc();
   }
+
+  /**
+   * The first readahead closes the stream.
+   */
+  @Test
+  public void testStreamCloseInFirstReadAhead() throws Exception {
+describe("close a stream during prefetch, verify outcome is good");
+
+AbfsClient client = getMockAbfsClient();
+AbfsRestOperation successOp = getMockRestOp();
+
+AbfsInputStream inputStream = getAbfsInputStream(client, getMethodName());
+ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+
+final long initialInProgressBlocksDiscarded = 
bufferManager.getInProgressBlocksDiscarded();
+
+// on first read, the op succeeds but the stream is closed, which
+// means that the request should be considered a failure
+doAnswer(invocation -> {
+  LOG.info("in read call with {}", inputStream);
+  inputStream.close();
+  return successOp;
+}).doReturn(successOp)
+.when(client)
+.read(any(String.class), any(Long.class), any(byte[].class),
+any(Integer.class), any(Integer.class), any(String.class),
+any(String.class), any(TracingContext.class));
+
+// kick these off before the read() to avoid various race conditions.
+queueReadAheads(inputStream);
+
+// AbfsInputStream Read would have waited for the read-ahead for the 
requested offset
+// as we are testing from ReadAheadManager directly, sleep for a sec to
+// get the read ahead threads to complete
+waitForPrefetchCompletion();
+
+// this triggers prefetching, which closes the stream while the read
+// is queued. which causes the prefetch to not return.
+// which triggers a blocking read, which will then fail.
+intercept(PathIOException.class, FSExceptionMessages.STREAM_IS_CLOSED, () 
-> {
+  // should fail
+  int bytesRead = inputStream.read(new byte[ONE_KB]);
+  // diagnostics info if failure wasn't raised
+  return "read " + bytesRead + " bytes from " + inputStream;
+});
+
+Assertions.assertThat(bufferManager.getCompletedReadListCopy())
+.filteredOn(rb -> rb.getStream() == inputStream)
+.describedAs("list of completed reads")
+.isEmpty();
+IOStatisticsStore ios = inputStream.getIOStatistics();
+assertThatStatisticCounter(ios, STREAM_READ_PREFETCH_BLOCKS_DISCARDED)
+.describedAs("blocks discarded by %s", inputStream)
+.isGreaterThan(0);
+
+// at least one of the blocks was discarded in progress.
+// this is guaranteed because the mockito callback must have been invoked
+// by the prefetcher
+Assertions.assertThat(bufferManager.getInProgressBlocksDiscarded())
+.describedAs("in progress blocks discarded")
+.isGreaterThan(initialInProgressBlocksDiscarded);
+  }
+
 }

Review Comment:
   nit: Add line at the end of the file.



##
hadoop-tools/hadoop-azure/src/test/resources/log4j.properties:
##
@@ -26,6 +26,8 @@ 
log4j.logger.org.apache.hadoop.fs.azure.AzureFileSystemThreadPoolExecutor=DEBUG
 log4j.logger.org.apache.hadoop.fs.azure.BlockBlobAppendStream=DEBUG
 
log4j.logger.org.apache.hadoop.fs.azurebfs.contracts.services.TracingService=TRACE
 log4j.logger.org.apache.hadoop.fs.azurebfs.services.AbfsClient=DEBUG
+log4j.logger.org.apache.hadoop.fs.azurebfs.services.ReadBufferManager=TRACE

Review Comment:
   Was this added for testing as this might add a lot of logging ?



##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java:
##
@@ -162,4 +182,120 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) 
{
 this.isAnyByteConsumed = isAnyByteConsumed;
   }
 
+  @Override
+  public String toString() {
+return super.toString() +
+"{ status=" + status +
+", offset=" + offset +
+", length=" + length +
+", requestedLength=" + requestedLength +
+", bufferindex=" + bufferindex +
+", timeStamp=" + timeStamp +
+", isFirstByteConsumed=" + isFirstByteConsumed +
+", isLastByteConsumed=" + isLastByteConsumed +
+", isAnyByteCons

[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-28 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033415579


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * Interface which is required for read buffer stream
+ * calls.
+ * Extracted from {@code AbfsInputStream} to make testing
+ * easier and to isolate what operations the read buffer
+ * makes of the streams using it.
+ */
+interface ReadBufferStreamOperations {
+
+  /**
+   * Read a block from the store.
+   * @param position position in file
+   * @param b destination buffer.
+   * @param offset offset in buffer
+   * @param length length of read
+   * @param tracingContext trace context
+   * @return count of bytes read.
+   * @throws IOException failure.
+   */
+  int readRemote(long position,
+  byte[] b,
+  int offset,
+  int length,
+  TracingContext tracingContext) throws IOException;
+
+  /**
+   * Is the stream closed?
+   * This must be thread safe as prefetch operations in
+   * different threads probe this before closure.
+   * @return true if the stream has been closed.
+   */
+  boolean isClosed();
+
+  String getStreamID();
+
+  IOStatisticsStore getIOStatistics();
+
+  /**
+   * Get the stream path as a string.
+   * @return path string.
+   */
+  String getPath();
+

Review Comment:
   nit: Additional line, can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-28 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1033415579


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferStreamOperations.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * Interface which is required for read buffer stream
+ * calls.
+ * Extracted from {@code AbfsInputStream} to make testing
+ * easier and to isolate what operations the read buffer
+ * makes of the streams using it.
+ */
+interface ReadBufferStreamOperations {
+
+  /**
+   * Read a block from the store.
+   * @param position position in file
+   * @param b destination buffer.
+   * @param offset offset in buffer
+   * @param length length of read
+   * @param tracingContext trace context
+   * @return count of bytes read.
+   * @throws IOException failure.
+   */
+  int readRemote(long position,
+  byte[] b,
+  int offset,
+  int length,
+  TracingContext tracingContext) throws IOException;
+
+  /**
+   * Is the stream closed?
+   * This must be thread safe as prefetch operations in
+   * different threads probe this before closure.
+   * @return true if the stream has been closed.
+   */
+  boolean isClosed();
+
+  String getStreamID();
+
+  IOStatisticsStore getIOStatistics();
+
+  /**
+   * Get the stream path as a string.
+   * @return path string.
+   */
+  String getPath();
+

Review Comment:
   Additional line, can be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-11 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1020173444


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##
@@ -454,31 +588,65 @@ ReadBuffer getNextBlockToRead() throws 
InterruptedException {
*/
   void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
 if (LOGGER.isTraceEnabled()) {
-  LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-  buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-}
-synchronized (this) {
-  // If this buffer has already been purged during
-  // close of InputStream then we don't update the lists.
-  if (inProgressList.contains(buffer)) {
-inProgressList.remove(buffer);
+  LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}; 
{}",
+  buffer.getStream().getPath(),  buffer.getOffset(), 
bytesActuallyRead, buffer);
+}
+// decrement counter.
+buffer.prefetchFinished();
+
+try {
+  synchronized (this) {
+// remove from the list
+if (!inProgressList.remove(buffer)) {
+  // this is a sign of inconsistent state, so a major problem
+  String message =
+  String.format("Read completed from an operation not declared as 
in progress %s",
+  buffer);
+  LOGGER.warn(message);
+  // release the buffer (which may raise an exception)
+  placeBufferOnFreeList("read not in progress", buffer);
+  // report the failure
+  throw new IllegalStateException(message);
+}
+
+boolean shouldFreeBuffer = false;
+String freeBufferReason = "";
 if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
   buffer.setStatus(ReadBufferStatus.AVAILABLE);
   buffer.setLength(bytesActuallyRead);
 } else {
-  freeList.push(buffer.getBufferindex());
-  // buffer will be deleted as per the eviction policy.
+  // read failed or there was no data, -the buffer can be returned to 
the free list.
+  shouldFreeBuffer = true;
+  freeBufferReason = "failed read";
 }
 // completed list also contains FAILED read buffers
 // for sending exception message to clients.
 buffer.setStatus(result);
 buffer.setTimeStamp(currentTimeMillis());
-completedReadList.add(buffer);
+if (!buffer.isStreamClosed()) {
+  // completed reads are added to the list.
+  LOGGER.trace("Adding buffer to completed list {}", buffer);
+  completedReadList.add(buffer);

Review Comment:
   Agree with this because the current flow might lead to double addition in 
free list or inconsistency during addition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] anmolanmol1234 commented on a diff in pull request #5117: HADOOP-18521. ABFS ReadBufferManager must not reuse in-progress buffers

2022-11-11 Thread GitBox


anmolanmol1234 commented on code in PR #5117:
URL: https://github.com/apache/hadoop/pull/5117#discussion_r1020171703


##
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##
@@ -302,33 +325,33 @@ private synchronized boolean tryEvict() {
   }
 
   private boolean evict(final ReadBuffer buf) {
-// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
-// avoid adding it to freeList.
-if (buf.getBufferindex() != -1) {
-  freeList.push(buf.getBufferindex());
-}
-
-completedReadList.remove(buf);
 buf.setTracingContext(null);
 if (LOGGER.isTraceEnabled()) {
   LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
   buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
 }
+completedReadList.remove(buf);

Review Comment:
   Buffer should be removed from completed list after it has been added to the 
free list. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org