pranavsaxena-microsoft commented on code in PR #5117: URL: https://github.com/apache/hadoop/pull/5117#discussion_r1017458506
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -534,17 +676,31 @@ void callTryEvict() { tryEvict(); } - /** * Purging the buffers associated with an {@link AbfsInputStream} * from {@link ReadBufferManager} when stream is closed. + * Before HADOOP-18521 this would purge in progress reads, which + * would return the active buffer to the free pool while it was + * still in use. * @param stream input stream. */ public synchronized void purgeBuffersForStream(AbfsInputStream stream) { LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + + // remove from the queue + int before = readAheadQueue.size(); readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); - purgeList(stream, completedReadList); - purgeList(stream, inProgressList); + int readaheadPurged = readAheadQueue.size() - before; Review Comment: By the thread reaches this line, maybe some more blocks would be added in readAheadQueue, this may bloat the metric. Also, before should >= readAheadQueue.size() (in case no additional blocks are ahead), this would result in negative addition. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); + } + try { + synchronized (this) { + checkState(inProgressList.remove(buffer), + "Read completed from an operation not declared as in progress %s", buffer); + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setLength(bytesActuallyRead); } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // there is no data, so it is immediately returned to the free list. + placeBufferOnFreeList("failed read", buffer); Review Comment: This may result in IllegalStateException propogating to AbfsInputStream. This line will add the buffer into freeList, from which this index shall be taken by readBuffer b1. Now, after sometime, let this buffer from completedList needs to be evicted, it would come to https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L408, two things can happen: 1. freeList still has this index: it will throw IllegalStateException 2. freeList doesn't have: it will throw IllegalStateException from https://github.com/steveloughran/hadoop/blob/azure/HADOOP-18521-buffer-manager/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java#L411. ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); + } + try { + synchronized (this) { + checkState(inProgressList.remove(buffer), + "Read completed from an operation not declared as in progress %s", buffer); + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setLength(bytesActuallyRead); } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // there is no data, so it is immediately returned to the free list. + placeBufferOnFreeList("failed read", buffer); Review Comment: Made a suggestive-change, which prevents this: https://github.com/pranavsaxena-microsoft/hadoop/commit/0d09a0de501bdc928139263075f82feb064fd6bc ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -454,31 +588,39 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); + } + try { + synchronized (this) { + checkState(inProgressList.remove(buffer), + "Read completed from an operation not declared as in progress %s", buffer); + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setLength(bytesActuallyRead); } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // there is no data, so it is immediately returned to the free list. + placeBufferOnFreeList("failed read", buffer); Review Comment: Test for the same: https://github.com/pranavsaxena-microsoft/hadoop/commit/18da3752f3f72a953cecba0525a01bfab6be89ee. In seperate run: ``` java.lang.IllegalStateException: Buffer 14 returned to free buffer list by non-owner ReadBuffer{status=AVAILABLE, offset=4194304, length=0, requestedLength=4194304, bufferindex=14, timeStamp=46807492, isFirstByteConsumed=false, isLastByteConsumed=false, isAnyByteConsumed=false, errException=org.apache.hadoop.fs.PathIOException: `/testfilef6b6f93ac245': Input/output error: Buffer index 14 found in buffer collection completedReadList, stream=org.apache.hadoop.fs.azurebfs.services.AbfsInputStream@652e2419{counters=((stream_read_bytes_backwards_on_seek=0) (stream_read_operations=1) (remote_read_op=2) (stream_read_seek_backward_operations=0) (action_http_get_request.failures=0) (action_http_get_request=0) (bytes_read_buffer=0) (stream_read_bytes=0) (seek_in_buffer=0) (remote_bytes_read=0) (stream_read_seek_bytes_skipped=0) (stream_read_seek_operations=2) (read_ahead_bytes_read=0) (stream_read_seek_forward_operations=2)); gauges=(); minimums=((action_http_get_request.failures.min=-1) (action_http_get_request.min=-1)); maximums=((action_http_get_request.max=-1) (action_http_get_request.failures.max=-1)); means=((action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000)) (action_http_get_request.mean=(samples=0, sum=0, mean=0.0000))); }AbfsInputStream@(1697522713){StreamStatistics{counters=((remote_bytes_read=0) (stream_read_seek_backward_operations=0) (remote_read_op=2) (stream_read_seek_forward_operations=2) (bytes_read_buffer=0) (seek_in_buffer=0) (stream_read_bytes=0) (stream_read_operations=1) (read_ahead_bytes_read=0) (stream_read_bytes_backwards_on_seek=0) (stream_read_seek_operations=2) (action_http_get_request.failures=0) (stream_read_seek_bytes_skipped=0) (action_http_get_request=0)); gauges=(); minimums=((action_http_get_request.min=-1) (action_http_get_request.failures.min=-1)); maximums=((action_http_get_request.failures.max=-1) (action_http_get_request.max=-1)); means=((action_http_get_request.mean=(samples=0, sum=0, mean=0.0000)) (action_http_get_request.failures.mean=(samples=0, sum=0, mean=0.0000))); }}} at org.apache.hadoop.util.Preconditions.checkState(Preconditions.java:298) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.verifyReadOwnsBufferAtIndex(ReadBufferManager.java:430) at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.placeBufferOnFreeList(ReadBufferManager.java:411) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org