steveloughran commented on code in PR #5176:
URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997


##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
       REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {

Review Comment:
   override `teardown()`, call superclass. that way you know the order of 
things happening



##########
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml:
##########
@@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off.
   <description>The AbstractFileSystem for gs: uris.</description>
 </property>
 
-<property>
-  <name>fs.azure.enable.readahead</name>
-  <value>false</value>

Review Comment:
   retain but set to true. why so? storediag will log it and so show that 
someone has explicitly said "readahead is safe here"



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -82,6 +84,16 @@ public class TestAbfsInputStream extends
       REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec
   private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * 
ONE_MB;
 
+  @After
+  public void afterTest() throws InterruptedException {
+    //thread wait so that previous test's inProgress buffers are processed and 
removed.
+    Thread.sleep(10000l);

Review Comment:
   don't like this as it potentially ladds 10s to a test run, one which could 
still be a bit flaky.
   
   what about using `testResetReadBufferManager()`?
   



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize(13) 



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+
+    //Sleep so that response from mockedClient gets back to ReadBufferWorker 
and
+    // can populate into completedList.
+    Thread.sleep(3000l);
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 0 elements")
+        .isEqualTo(0);
+
+    Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize()



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    final AtomicInteger movedToInProgressList = new AtomicInteger(0);
+    final AtomicInteger movedToCompletedList = new AtomicInteger(0);
+    final AtomicBoolean preClosedAssertion = new AtomicBoolean(false);
+
+    Mockito.doAnswer(invocationOnMock -> {
+          movedToInProgressList.incrementAndGet();
+          while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) 
{
+
+          }
+          movedToCompletedList.incrementAndGet();
+          return successOp;
+        })
+        .when(client)

Review Comment:
   this is very brittle being timing based. normally I'd say "no" here, but I 
know I have a forthcoming pr which uses object.wait/notify to synchronize
   
https://github.com/apache/hadoop/pull/5117/files#diff-e829dbaa29faf05ae0b331439e9aec3cd02248464a097c86a0227783337b9b76R370
   
   if this test causes problems it should do the same



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size())
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+
+    inputStream.close();
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+
+    //Sleep so that response from mockedClient gets back to ReadBufferWorker 
and
+    // can populate into completedList.
+    Thread.sleep(3000l);
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 13 elements")
+        .isEqualTo(13);
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 0 elements")
+        .isEqualTo(0);
+
+    Thread.sleep(readBufferManager.getThresholdAgeMilliseconds());
+
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+    readBufferManager.callTryEvict();
+
+    Assertions.assertThat(getStreamRelatedBufferCount(
+            readBufferManager.getCompletedReadListCopy(), inputStream))
+        .describedAs("CompletedList should have 0 elements")
+        .isEqualTo(0);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())
+        .describedAs("FreeList should have 16 elements")
+        .isEqualTo(16);
+  }
+
+  private int getStreamRelatedBufferCount(final List<ReadBuffer> bufferList,
+      final AbfsInputStream inputStream) {
+    int count = 0;

Review Comment:
   prefer java8 streaming
   ```
   bufferList.stream()
     .filter(buffer -> buffer.getStream() == inputStream)
     .count()
   ```
   



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);

Review Comment:
   1_000L



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception {
     checkEvictedStatus(inputStream, 0, true);
   }
 
+  /**
+   * This test expects InProgressList is not purged by the inputStream close.
+   * The readBuffer will move to completedList and then finally should get 
evicted.
+   */
+  @Test
+  public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception {
+    AbfsClient client = getMockAbfsClient();
+    AbfsRestOperation successOp = getMockRestOp();
+
+    Mockito.doAnswer(invocationOnMock -> {
+          //sleeping thread to mock the network latency from client to backend.
+          Thread.sleep(3000l);
+          return successOp;
+        })
+        .when(client)
+        .read(any(String.class), any(Long.class), any(byte[].class),
+            any(Integer.class), any(Integer.class), any(String.class),
+            any(String.class), any(TracingContext.class));
+
+    AbfsInputStream inputStream = getAbfsInputStream(client,
+        "testSuccessfulReadAhead.txt");
+    queueReadAheads(inputStream);
+
+    final ReadBufferManager readBufferManager
+        = ReadBufferManager.getBufferManager();
+
+    //Sleeping to give ReadBufferWorker to pick the readBuffers for processing.
+    Thread.sleep(1000l);
+
+    Assertions.assertThat(
+            
getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(),
+                inputStream))
+        .describedAs("InProgressList should have 3 elements")
+        .isEqualTo(3);
+    Assertions.assertThat(readBufferManager.getFreeListCopy().size())

Review Comment:
   use .hasSize(13) in the assert, so assertj will provide info about the list 
if there's a mismatch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to