This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c15987ee38 MAPREDUCE-7441. Fix race condition in closing
FadvisedFileRegion. Contributed by Benjamin Teke
1c15987ee38 is described below
commit 1c15987ee38b96cf2ef4ff383f3f86e082c50fab
Author: Szilard Nemeth
AuthorDate: Fri Jun 23 14:40:03 2023 -0400
MAPREDUCE-7441. Fix race condition in closing FadvisedFileRegion.
Contributed by Benjamin Teke
---
.../apache/hadoop/mapred/FadvisedFileRegion.java | 102 -
1 file changed, 58 insertions(+), 44 deletions(-)
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
index 9290a282e39..184b58e6c76 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
@@ -41,6 +41,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private static final Logger LOG =
LoggerFactory.getLogger(FadvisedFileRegion.class);
+ private final Object closeLock = new Object();
private final boolean manageOsCache;
private final int readaheadLength;
private final ReadaheadPool readaheadPool;
@@ -51,12 +52,12 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final int shuffleBufferSize;
private final boolean shuffleTransferToAllowed;
private final FileChannel fileChannel;
-
- private ReadaheadRequest readaheadRequest;
+
+ private volatile ReadaheadRequest readaheadRequest;
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
- String identifier, int shuffleBufferSize,
+ String identifier, int shuffleBufferSize,
boolean shuffleTransferToAllowed) throws IOException {
super(file.getChannel(), position, count);
this.manageOsCache = manageOsCache;
@@ -73,97 +74,110 @@ public class FadvisedFileRegion extends DefaultFileRegion {
@Override
public long transferTo(WritableByteChannel target, long position)
- throws IOException {
-if (readaheadPool != null && readaheadLength > 0) {
- readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- position() + position, readaheadLength,
- position() + count(), readaheadRequest);
+ throws IOException {
+synchronized (closeLock) {
+ if (fd.valid()) {
+if (readaheadPool != null && readaheadLength > 0) {
+ readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
+ position() + position, readaheadLength,
+ position() + count(), readaheadRequest);
+}
+
+if(this.shuffleTransferToAllowed) {
+ return super.transferTo(target, position);
+} else {
+ return customShuffleTransfer(target, position);
+}
+ } else {
+return 0L;
+ }
}
-
-if(this.shuffleTransferToAllowed) {
- return super.transferTo(target, position);
-} else {
- return customShuffleTransfer(target, position);
-}
+
}
/**
- * This method transfers data using local buffer. It transfers data from
- * a disk to a local buffer in memory, and then it transfers data from the
+ * This method transfers data using local buffer. It transfers data from
+ * a disk to a local buffer in memory, and then it transfers data from the
* buffer to the target. This is used only if transferTo is disallowed in
- * the configuration file. super.TransferTo does not perform well on Windows
- * due to a small IO request generated. customShuffleTransfer can control
- * the size of the IO requests by changing the size of the intermediate
+ * the configuration file. super.TransferTo does not perform well on Windows
+ * due to a small IO request generated. customShuffleTransfer can control
+ * the size of the IO requests by changing the size of the intermediate
* buffer.
*/
@VisibleForTesting
long customShuffleTransfer(WritableByteChannel target, long position)
- throws IOException {
+ throws IOException {
long actualCount = this.count - position;
if (actualCount < 0 || position < 0) {
throw new IllegalArgumentException(
- "position out of range: " + position +
- " (expected: 0 - " + (this.count - 1) + ')');
+ "position out of range: " +