[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed

2022-12-06 Thread GitBox


steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1041097514


##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -495,6 +499,63 @@ public void testSuccessfulReadAhead() throws Exception {
 checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+AbfsClient client = getMockAbfsClient();
+AbfsRestOperation successOp = getMockRestOp();
+final Long serverCommunicationMockLatency = 3_000L;
+final Long readBufferTransferToInProgressProbableTime = 1_000L;
+final Integer readBufferQueuedCount = 3;
+
+Mockito.doAnswer(invocationOnMock -> {
+  //sleeping thread to mock the network latency from client to backend.
+  Thread.sleep(serverCommunicationMockLatency);
+  return successOp;
+})
+.when(client)
+.read(any(String.class), any(Long.class), any(byte[].class),
+any(Integer.class), any(Integer.class), any(String.class),
+any(String.class), any(TracingContext.class));
+
+AbfsInputStream inputStream = getAbfsInputStream(client,
+"testSuccessfulReadAhead.txt");
+queueReadAheads(inputStream);
+
+final ReadBufferManager readBufferManager
+= ReadBufferManager.getBufferManager();
+
+final int readBufferTotal = readBufferManager.getNumBuffers();
+
+//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+Thread.sleep(readBufferTransferToInProgressProbableTime);
+
+Assertions.assertThat(readBufferManager.getInProgressCopiedList())
+.describedAs("InProgressList should have " + readBufferQueuedCount + " 
elements")
+.hasSize(readBufferQueuedCount);
+final int freeListBufferCount = readBufferTotal - readBufferQueuedCount;
+Assertions.assertThat(readBufferManager.getFreeListCopy())
+.describedAs("FreeList should have " + freeListBufferCount + 
"elements")
+.hasSize(freeListBufferCount);
+Assertions.assertThat(readBufferManager.getCompletedReadListCopy())
+.describedAs("CompletedList should have 0 elements")
+.hasSize(0);
+
+inputStream.close();

Review Comment:
   the problem with the close() here is that it will only be reached if the 
assertions hold. if anything goes wrong, an exception is raised and the stream 
kept open, with whatever resources it consumes.
   
   it should be closed in a finally block *or* the stream opened in a 
try-with-resources clause. thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed

2022-12-06 Thread GitBox


steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1040769983


##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -524,30 +527,33 @@ public void testStreamPurgeDuringReadAheadCallExecuting() 
throws Exception {
 final ReadBufferManager readBufferManager
 = ReadBufferManager.getBufferManager();
 
+final int readBufferTotal = readBufferManager.getNumBuffers();
+
 //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
-Thread.sleep(1_000L);
+Thread.sleep(readBufferTransferToInProgressProbableTime);
 
 Assertions.assertThat(readBufferManager.getInProgressCopiedList())
-.describedAs("InProgressList should have 3 elements")
-.hasSize(3);
+.describedAs("InProgressList should have " + readBufferQueuedCount + " 
elements")
+.hasSize(readBufferQueuedCount);
+final int freeListBufferCount = readBufferTotal - readBufferQueuedCount;
 Assertions.assertThat(readBufferManager.getFreeListCopy())
-.describedAs("FreeList should have 13 elements")
-.hasSize(13);
+.describedAs("FreeList should have " + freeListBufferCount + 
"elements")

Review Comment:
   you can actually use string.format patterns here; most relevant for on 
demand toString calls which are more expensive. I'm not worrying about it here 
though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed

2022-12-02 Thread GitBox


steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1038058686


##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
 checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+AbfsClient client = getMockAbfsClient();
+AbfsRestOperation successOp = getMockRestOp();
+
+final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+Mockito.doAnswer(invocationOnMock -> {
+  movedToInProgressList.incrementAndGet();
+  while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) 
{
+
+  }
+  movedToCompletedList.incrementAndGet();
+  return successOp;
+})
+.when(client)

Review Comment:
   it may be slow, but at least there's no assertion that something finishes 
*before* a specific timeout. those are the tests which really have problems on 
slow networks/overloaded systems



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed

2022-12-01 Thread GitBox


steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997


##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
   REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {

Review Comment:
   override `teardown()`, call superclass. that way you know the order of 
things happening



##
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml:
##
@@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off.
   The AbstractFileSystem for gs: uris.
 
 
-
-  fs.azure.enable.readahead
-  false

Review Comment:
   retain but set to true. why so? storediag will log it and so show that 
someone has explicitly said "readahead is safe here"



##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
   REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {
+//thread wait so that previous test's inProgress buffers are processed and 
removed.
+Thread.sleep(1l);

Review Comment:
   don't like this as it potentially ladds 10s to a test run, one which could 
still be a bit flaky.
   
   what about using `testResetReadBufferManager()`?
   



##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
 checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+AbfsClient client = getMockAbfsClient();
+AbfsRestOperation successOp = getMockRestOp();
+
+Mockito.doAnswer(invocationOnMock -> {
+  //sleeping thread to mock the network latency from client to backend.
+  Thread.sleep(3000l);
+  return successOp;
+})
+.when(client)
+.read(any(String.class), any(Long.class), any(byte[].class),
+any(Integer.class), any(Integer.class), any(String.class),
+any(String.class), any(TracingContext.class));
+
+AbfsInputStream inputStream = getAbfsInputStream(client,
+"testSuccessfulReadAhead.txt");
+queueReadAheads(inputStream);
+
+final ReadBufferManager readBufferManager
+= ReadBufferManager.getBufferManager();
+
+//Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+Thread.sleep(1000l);
+
+Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+inputStream))
+.describedAs("InProgressList should have 3 elements")
+.isEqualTo(3);
+Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+.describedAs("FreeList should have 13 elements")
+.isEqualTo(13);
+Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+.describedAs("CompletedList should have 0 elements")
+.isEqualTo(0);
+
+inputStream.close();
+
+Assertions.assertThat(
+
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+inputStream))
+.describedAs("InProgressList should have 3 elements")
+.isEqualTo(3);
+Assertions.assertThat(getStreamRelatedBufferCount(
+readBufferManager.getCompletedReadListCopy(), inputStream))
+.describedAs("CompletedList should have 0 elements")
+.isEqualTo(0);
+Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize(13) 



##
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
 checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+AbfsClient client = getMockAbfsClient();
+AbfsRestOperation successOp = getMockRestOp();
+
+Mockito.doAnswer(invocationOnMock -> {
+  //sleeping thread to mock the network latency from client