anmolanmol1234 commented on code in PR #5117: URL: https://github.com/apache/hadoop/pull/5117#discussion_r1020173444
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -454,31 +588,65 @@ ReadBuffer getNextBlockToRead() throws InterruptedException { */ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); + LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}; {}", + buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead, buffer); + } + // decrement counter. + buffer.prefetchFinished(); + + try { + synchronized (this) { + // remove from the list + if (!inProgressList.remove(buffer)) { + // this is a sign of inconsistent state, so a major problem + String message = + String.format("Read completed from an operation not declared as in progress %s", + buffer); + LOGGER.warn(message); + // release the buffer (which may raise an exception) + placeBufferOnFreeList("read not in progress", buffer); + // report the failure + throw new IllegalStateException(message); + } + + boolean shouldFreeBuffer = false; + String freeBufferReason = ""; if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); buffer.setLength(bytesActuallyRead); } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. + // read failed or there was no data, -the buffer can be returned to the free list. + shouldFreeBuffer = true; + freeBufferReason = "failed read"; } // completed list also contains FAILED read buffers // for sending exception message to clients. buffer.setStatus(result); buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); + if (!buffer.isStreamClosed()) { + // completed reads are added to the list. + LOGGER.trace("Adding buffer to completed list {}", buffer); + completedReadList.add(buffer); Review Comment: Agree with this because the current flow might lead to double addition in free list or inconsistency during addition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org