[ https://issues.apache.org/jira/browse/HADOOP-18546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641865#comment-17641865 ]
ASF GitHub Bot commented on HADOOP-18546: ----------------------------------------- snvijaya commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037025160 ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + final AtomicInteger movedToInProgressList = new AtomicInteger(0); + final AtomicInteger movedToCompletedList = new AtomicInteger(0); + final AtomicBoolean preClosedAssertion = new AtomicBoolean(false); + + Mockito.doAnswer(invocationOnMock -> { + movedToInProgressList.incrementAndGet(); + while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) { + + } + movedToCompletedList.incrementAndGet(); + return successOp; + }) + .when(client) Review Comment: The test is trying to unit test a bigger scope of existing inprogress buffer moving to completed list. Will be nice to scope the test to inProgressList and freelist counts, before and after close. At this client.read() mock, I would suggest mocks that will invoke a large sleep for each read. That way after queueReadAheads call and a 1 sec sleep, 3 buffers will be stuck inProgessList and the freeeList should show 13 free. The asserts should continue to hold to same numbers post close as well. ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + final AtomicInteger movedToInProgressList = new AtomicInteger(0); + final AtomicInteger movedToCompletedList = new AtomicInteger(0); + final AtomicBoolean preClosedAssertion = new AtomicBoolean(false); + + Mockito.doAnswer(invocationOnMock -> { + movedToInProgressList.incrementAndGet(); + while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) { + + } + movedToCompletedList.incrementAndGet(); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + while (movedToInProgressList.get() < 3) { + + } + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 3 elements") + .isEqualTo(0); + + inputStream.close(); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 3 elements") + .isEqualTo(0); + preClosedAssertion.set(true); + + while (movedToCompletedList.get() < 3) { + + } + + //Sleep so that response from mockedClient gets back to ReadBufferWorker and + // can populate into completedList. + Thread.sleep(10000l); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 3 elements") + .isEqualTo(3); + + Thread.sleep(readBufferManager.getThresholdAgeMilliseconds()); + + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + } + + + /** + * This test expects InProgressList is not purged by the inputStream close. + * The already readBuffer present in the completedList shall be purged by the + * inputStream close. + * The readBuffer from inProgressList will move to completedList and then + * finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecutingWithSomeCompletedBuffers() Review Comment: This test seems to be validating effect of purge on completedList. Does this validate any test scenario that is not already covered in HADOOP-17156 commit testcases ? Also, do all test asserts by HADOOP-17156 still hold good after this PR change preventing inprogress list purge ? > disable purging list of in progress reads in abfs stream closed > --------------------------------------------------------------- > > Key: HADOOP-18546 > URL: https://issues.apache.org/jira/browse/HADOOP-18546 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.3.4 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > turn off the prune of in progress reads in > ReadBufferManager::purgeBuffersForStream > this will ensure active prefetches for a closed stream complete. they wiill > then get to the completed list and hang around until evicted by timeout, but > at least prefetching will be safe. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org