[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed
steveloughran commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1041097514 ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -495,6 +499,63 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { +AbfsClient client = getMockAbfsClient(); +AbfsRestOperation successOp = getMockRestOp(); +final Long serverCommunicationMockLatency = 3_000L; +final Long readBufferTransferToInProgressProbableTime = 1_000L; +final Integer readBufferQueuedCount = 3; + +Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(serverCommunicationMockLatency); + return successOp; +}) +.when(client) +.read(any(String.class), any(Long.class), any(byte[].class), +any(Integer.class), any(Integer.class), any(String.class), +any(String.class), any(TracingContext.class)); + +AbfsInputStream inputStream = getAbfsInputStream(client, +"testSuccessfulReadAhead.txt"); +queueReadAheads(inputStream); + +final ReadBufferManager readBufferManager += ReadBufferManager.getBufferManager(); + +final int readBufferTotal = readBufferManager.getNumBuffers(); + +//Sleeping to give ReadBufferWorker to pick the readBuffers for processing. +Thread.sleep(readBufferTransferToInProgressProbableTime); + +Assertions.assertThat(readBufferManager.getInProgressCopiedList()) +.describedAs("InProgressList should have " + readBufferQueuedCount + " elements") +.hasSize(readBufferQueuedCount); +final int freeListBufferCount = readBufferTotal - readBufferQueuedCount; +Assertions.assertThat(readBufferManager.getFreeListCopy()) +.describedAs("FreeList should have " + freeListBufferCount + "elements") +.hasSize(freeListBufferCount); +Assertions.assertThat(readBufferManager.getCompletedReadListCopy()) +.describedAs("CompletedList should have 0 elements") +.hasSize(0); + +inputStream.close(); Review Comment: the problem with the close() here is that it will only be reached if the assertions hold. if anything goes wrong, an exception is raised and the stream kept open, with whatever resources it consumes. it should be closed in a finally block *or* the stream opened in a try-with-resources clause. thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org
[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed
steveloughran commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1040769983 ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -524,30 +527,33 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { final ReadBufferManager readBufferManager = ReadBufferManager.getBufferManager(); +final int readBufferTotal = readBufferManager.getNumBuffers(); + //Sleeping to give ReadBufferWorker to pick the readBuffers for processing. -Thread.sleep(1_000L); +Thread.sleep(readBufferTransferToInProgressProbableTime); Assertions.assertThat(readBufferManager.getInProgressCopiedList()) -.describedAs("InProgressList should have 3 elements") -.hasSize(3); +.describedAs("InProgressList should have " + readBufferQueuedCount + " elements") +.hasSize(readBufferQueuedCount); +final int freeListBufferCount = readBufferTotal - readBufferQueuedCount; Assertions.assertThat(readBufferManager.getFreeListCopy()) -.describedAs("FreeList should have 13 elements") -.hasSize(13); +.describedAs("FreeList should have " + freeListBufferCount + "elements") Review Comment: you can actually use string.format patterns here; most relevant for on demand toString calls which are more expensive. I'm not worrying about it here though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org
[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed
steveloughran commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1038058686 ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -495,6 +509,199 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { +AbfsClient client = getMockAbfsClient(); +AbfsRestOperation successOp = getMockRestOp(); + +final AtomicInteger movedToInProgressList = new AtomicInteger(0); +final AtomicInteger movedToCompletedList = new AtomicInteger(0); +final AtomicBoolean preClosedAssertion = new AtomicBoolean(false); + +Mockito.doAnswer(invocationOnMock -> { + movedToInProgressList.incrementAndGet(); + while (movedToInProgressList.get() < 3 || !preClosedAssertion.get()) { + + } + movedToCompletedList.incrementAndGet(); + return successOp; +}) +.when(client) Review Comment: it may be slow, but at least there's no assertion that something finishes *before* a specific timeout. those are the tests which really have problems on slow networks/overloaded systems -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org
[GitHub] [hadoop] steveloughran commented on a diff in pull request #5176: HADOOP-18546. ABFS:disable purging list of in progress reads in abfs stream closed
steveloughran commented on code in PR #5176: URL: https://github.com/apache/hadoop/pull/5176#discussion_r1037390997 ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -82,6 +84,16 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @After + public void afterTest() throws InterruptedException { Review Comment: override `teardown()`, call superclass. that way you know the order of things happening ## hadoop-common-project/hadoop-common/src/main/resources/core-default.xml: ## @@ -2166,13 +2166,6 @@ The switch to turn S3A auditing on or off. The AbstractFileSystem for gs: uris. - - fs.azure.enable.readahead - false Review Comment: retain but set to true. why so? storediag will log it and so show that someone has explicitly said "readahead is safe here" ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -82,6 +84,16 @@ public class TestAbfsInputStream extends REDUCED_READ_BUFFER_AGE_THRESHOLD * 10; // 30 sec private static final int ALWAYS_READ_BUFFER_SIZE_TEST_FILE_SIZE = 16 * ONE_MB; + @After + public void afterTest() throws InterruptedException { +//thread wait so that previous test's inProgress buffers are processed and removed. +Thread.sleep(1l); Review Comment: don't like this as it potentially ladds 10s to a test run, one which could still be a bit flaky. what about using `testResetReadBufferManager()`? ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { +AbfsClient client = getMockAbfsClient(); +AbfsRestOperation successOp = getMockRestOp(); + +Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client to backend. + Thread.sleep(3000l); + return successOp; +}) +.when(client) +.read(any(String.class), any(Long.class), any(byte[].class), +any(Integer.class), any(Integer.class), any(String.class), +any(String.class), any(TracingContext.class)); + +AbfsInputStream inputStream = getAbfsInputStream(client, +"testSuccessfulReadAhead.txt"); +queueReadAheads(inputStream); + +final ReadBufferManager readBufferManager += ReadBufferManager.getBufferManager(); + +//Sleeping to give ReadBufferWorker to pick the readBuffers for processing. +Thread.sleep(1000l); + +Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), +inputStream)) +.describedAs("InProgressList should have 3 elements") +.isEqualTo(3); +Assertions.assertThat(readBufferManager.getFreeListCopy().size()) +.describedAs("FreeList should have 13 elements") +.isEqualTo(13); +Assertions.assertThat(readBufferManager.getCompletedReadListCopy().size()) +.describedAs("CompletedList should have 0 elements") +.isEqualTo(0); + +inputStream.close(); + +Assertions.assertThat( + getStreamRelatedBufferCount(readBufferManager.getInProgressCopiedList(), +inputStream)) +.describedAs("InProgressList should have 3 elements") +.isEqualTo(3); +Assertions.assertThat(getStreamRelatedBufferCount( +readBufferManager.getCompletedReadListCopy(), inputStream)) +.describedAs("CompletedList should have 0 elements") +.isEqualTo(0); +Assertions.assertThat(readBufferManager.getFreeListCopy().size()) Review Comment: use .hasSize(13) ## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ## @@ -495,6 +505,105 @@ public void testSuccessfulReadAhead() throws Exception { checkEvictedStatus(inputStream, 0, true); } + /** + * This test expects InProgressList is not purged by the inputStream close. + * The readBuffer will move to completedList and then finally should get evicted. + */ + @Test + public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { +AbfsClient client = getMockAbfsClient(); +AbfsRestOperation successOp = getMockRestOp(); + +Mockito.doAnswer(invocationOnMock -> { + //sleeping thread to mock the network latency from client