HDFS-12479. Some misuses of lock in DFSStripedOutputStream. Contributed by Huafeng Wang
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dba7a7dd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dba7a7dd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dba7a7dd Branch: refs/heads/YARN-6592 Commit: dba7a7dd9d70adfab36a78eb55059c54e553a5cb Parents: 2018538 Author: Kai Zheng <kai.zh...@intel.com> Authored: Tue Sep 19 17:45:41 2017 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Tue Sep 19 17:45:41 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/dba7a7dd/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 44db3a6..66eec7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -63,6 +63,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -85,11 +86,10 @@ public class DFSStripedOutputStream extends DFSOutputStream private final List<BlockingQueue<T>> queues; MultipleBlockingQueue(int numQueue, int queueSize) { - List<BlockingQueue<T>> list = new ArrayList<>(numQueue); + queues = new ArrayList<>(numQueue); for (int i = 0; i < numQueue; i++) { - list.add(new LinkedBlockingQueue<T>(queueSize)); + queues.add(new LinkedBlockingQueue<T>(queueSize)); } - queues = Collections.synchronizedList(list); } void offer(int i, T object) { @@ -156,8 +156,7 @@ public class DFSStripedOutputStream extends DFSOutputStream followingBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); endBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); newBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1); - updateStreamerMap = Collections.synchronizedMap( - new HashMap<StripedDataStreamer, Boolean>(numAllBlocks)); + updateStreamerMap = new ConcurrentHashMap<>(numAllBlocks); streamerUpdateResult = new MultipleBlockingQueue<>(numAllBlocks, 1); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org