steveloughran commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997
########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -82,6 +84,16 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @After + public void afterTest() throws InterruptedException { Review Comment: override `teardown()`, call superclass. that way you know the order of things happening ########## hadoop-common-project/hadoop-common/src/main/resources/core-default.xml: ########## @@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off. <description>The AbstractFileSystem for gs: uris.</description> </property> -<property> - <name>fs.azure.enable.readahead</name> - <value>false</value> Review Comment: retain but set to true. why so? storediag will log it and so show that someone has explicitly said "readahead is safe here" ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -82,6 +84,16 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @After + public void afterTest() throws InterruptedException { + //thread wait so that previous test's inProgress buffers are processed and removed. + Thread.sleep(10000l); Review Comment: don't like this as it potentially ladds 10s to a test run, one which could still be a bit flaky. what about using `testResetReadBufferManager()`? ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(1000l); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size()) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + + inputStream.close(); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) Review Comment: use .hasSize(13) ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(1000l); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size()) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + + inputStream.close(); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + + //Sleep so that response from mockedClient gets back to ReadBufferWorker and + // can populate into completedList. + Thread.sleep(3000l); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 0 elements") + .isEqualTo(0); + + Thread.sleep(readBufferManager.getThresholdAgeMilliseconds()); + + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) Review Comment: use .hasSize() ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + final AtomicInteger movedToInProgressList = new AtomicInteger(0); + final AtomicInteger movedToCompletedList = new AtomicInteger(0); + final AtomicBoolean preClosedAssertion = new AtomicBoolean(false); + + Mockito.doAnswer(invocationOnMock -> { + movedToInProgressList.incrementAndGet(); + while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) { + + } + movedToCompletedList.incrementAndGet(); + return successOp; + }) + .when(client) Review Comment: this is very brittle being timing based. normally I'd say "no" here, but I know I have a forthcoming pr which uses object.wait/notify to synchronize https://github.com/apache/hadoop/pull/5117/files#diff-e829dbaa29faf05ae0b331439e9aec3cd02248464a097c86a0227783337b9b76R370 if this test causes problems it should do the same ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(1000l); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size()) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + + inputStream.close(); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + + //Sleep so that response from mockedClient gets back to ReadBufferWorker and + // can populate into completedList. + Thread.sleep(3000l); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 13 elements") + .isEqualTo(13); + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 0 elements") + .isEqualTo(0); + + Thread.sleep(readBufferManager.getThresholdAgeMilliseconds()); + + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + readBufferManager.callTryEvict(); + + Assertions.assertThat(getStreamRelatedBufferCount( + readBufferManager.getCompletedReadListCopy(), inputStream)) + .describedAs("CompletedList should have 0 elements") + .isEqualTo(0); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) + .describedAs("FreeList should have 16 elements") + .isEqualTo(16); + } + + private int getStreamRelatedBufferCount(final List<ReadBuffer> bufferList, + final AbfsInputStream inputStream) { + int count = 0; Review Comment: prefer java8 streaming ``` bufferList.stream() .filter(buffer -> buffer.getStream() == inputStream) .count() ``` ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(1000l); Review Comment: 1_000L ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; + }) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, + "testSuccessfulReadAhead.txt"); + queueReadAheads(inputStream); + + final ReadBufferManager readBufferManager + = ReadBufferManager.getBufferManager(); + + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. + Thread.sleep(1000l); + + Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), + inputStream)) + .describedAs("InProgressList should have 3 elements") + .isEqualTo(3); + Assertions.assertThat(readBufferManager.getFreeListCopy().size()) Review Comment: use .hasSize(13) in the assert, so assertj will provide info about the list if there's a mismatch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org