This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 83cdbca1e66 HDFS-17080. fix ec connection leak. (#5807)
83cdbca1e66 is described below
commit 83cdbca1e660567965ca6b0f862e3130545720d6
Author: yangy <[email protected]>
AuthorDate: Tue Jan 7 05:49:55 2025 +0800
HDFS-17080. fix ec connection leak. (#5807)
(cherry picked from commit 815ca41c69bd4be4f20d6c3f5331de927420f3d2)
---
.../java/org/apache/hadoop/hdfs/StripeReader.java | 40 ++++++++++++++--------
.../org/apache/hadoop/mapred/LineRecordReader.java | 7 +++-
2 files changed, 32 insertions(+), 15 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index f2d6732a459..bc39bace795 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -253,6 +253,9 @@ abstract class StripeReader {
strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
+ if (blockReader != null) {
+ blockReader.close();
+ }
throw ce;
} catch (IOException e) {
DFSClient.LOG.warn("Exception while reading from "
@@ -260,6 +263,9 @@ abstract class StripeReader {
+ currentNode, e);
//Clear buffer to make next decode success
strategy.getReadBuffer().clear();
+ if (blockReader != null) {
+ blockReader.close();
+ }
throw e;
}
}
@@ -329,21 +335,26 @@ abstract class StripeReader {
* read the whole stripe. do decoding if necessary
*/
void readStripe() throws IOException {
- for (int i = 0; i < dataBlkNum; i++) {
- if (alignedStripe.chunks[i] != null &&
- alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
- if (!readChunk(targetBlocks[i], i)) {
- alignedStripe.missingChunksNum++;
+ try {
+ for (int i = 0; i < dataBlkNum; i++) {
+ if (alignedStripe.chunks[i] != null &&
+ alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+ if (!readChunk(targetBlocks[i], i)) {
+ alignedStripe.missingChunksNum++;
+ }
}
}
- }
- // There are missing block locations at this stage. Thus we need to read
- // the full stripe and one more parity block.
- if (alignedStripe.missingChunksNum > 0) {
- checkMissingBlocks();
- readDataForDecoding();
- // read parity chunks
- readParityChunks(alignedStripe.missingChunksNum);
+ // There are missing block locations at this stage. Thus we need to read
+ // the full stripe and one more parity block.
+ if (alignedStripe.missingChunksNum > 0) {
+ checkMissingBlocks();
+ readDataForDecoding();
+ // read parity chunks
+ readParityChunks(alignedStripe.missingChunksNum);
+ }
+ } catch (IOException e) {
+ dfsStripedInputStream.close();
+ throw e;
}
// TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
@@ -385,7 +396,8 @@ abstract class StripeReader {
}
} catch (InterruptedException ie) {
String err = "Read request interrupted";
- DFSClient.LOG.error(err);
+ DFSClient.LOG.error(err, ie);
+ dfsStripedInputStream.close();
clearFutures();
// Don't decode if read interrupted
throw new InterruptedIOException(err);
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index ab63c199f2f..4fd9a5fda0a 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -153,7 +153,12 @@ public class LineRecordReader implements
RecordReader<LongWritable, Text> {
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
- start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+ try {
+ start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
}
this.pos = start;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]