This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 9acbbd885c2869e0e7cfd4cb7efc105f54e4cc13 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Wed Dec 25 11:07:25 2019 +0530 HDFS-12999. When reach the end of the block group, it may not need to flush all the data packets(flushAllInternals) twice. Contributed by lufei and Fei Hui. (cherry picked from commit df622cf4a32ee172ded6c4b3b97a1e49befc4f10) --- .../org/apache/hadoop/hdfs/DFSStripedOutputStream.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 97310ee..c94c9da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -556,7 +556,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // if this is the end of the block group, end each internal block if (shouldEndBlockGroup()) { flushAllInternals(); - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { @@ -567,7 +567,7 @@ public class DFSStripedOutputStream extends DFSOutputStream } } else { // check failure state for all the streamers. Bump GS if necessary - checkStreamerFailures(); + checkStreamerFailures(true); } } setCurrentStreamer(next); @@ -623,15 +623,18 @@ public class DFSStripedOutputStream extends DFSOutputStream * written a full stripe (i.e., enqueue all packets for a full stripe), or * when we're closing the outputstream. */ - private void checkStreamerFailures() throws IOException { + private void checkStreamerFailures(boolean isNeedFlushAllPackets) + throws IOException { Set<StripedDataStreamer> newFailed = checkStreamers(); if (newFailed.size() == 0) { return; } - // for healthy streamers, wait till all of them have fetched the new block - // and flushed out all the enqueued packets. - flushAllInternals(); + if (isNeedFlushAllPackets) { + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + } // recheck failed streamers again after the flush newFailed = checkStreamers(); while (newFailed.size() > 0) { @@ -1188,7 +1191,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // flush all the data packets flushAllInternals(); // check failures - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org