HDFS-11251. ConcurrentModificationException during DataNode#refreshVolumes. (Manoj Govindassamy via lei)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9f13968 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9f13968 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9f13968 Branch: refs/heads/YARN-3926 Commit: e9f1396834174646a8d7aa8fc6c4a4f724ca5b28 Parents: 603f3ef Author: Lei Xu <l...@apache.org> Authored: Thu Dec 29 15:10:36 2016 +0800 Committer: Lei Xu <l...@apache.org> Committed: Thu Dec 29 15:11:25 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hdfs/server/common/Storage.java | 6 +- .../server/datanode/BlockPoolSliceStorage.java | 2 +- .../hdfs/server/datanode/DataStorage.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 154 +++++++++++++++++-- 4 files changed, 150 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9f13968/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 1f03fc2..c172289 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -32,6 +32,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.io.FileUtils; import org.apache.commons.logging.Log; @@ -122,8 +123,9 @@ public abstract class Storage extends StorageInfo { public StorageDirType getStorageDirType(); public boolean isOfType(StorageDirType type); } - - protected List<StorageDirectory> storageDirs = new ArrayList<StorageDirectory>(); + + protected List<StorageDirectory> storageDirs = + new CopyOnWriteArrayList<StorageDirectory>(); private class DirIterator implements Iterator<StorageDirectory> { final StorageDirType dirType; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9f13968/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java index 9bd221e..dd82a74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java @@ -299,7 +299,7 @@ public class BlockPoolSliceStorage extends Storage { it.hasNext(); ) { StorageDirectory sd = it.next(); if (sd.getRoot().getAbsoluteFile().equals(absPathToRemove)) { - it.remove(); + this.storageDirs.remove(sd); break; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9f13968/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 5163e6b..12d9322 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -508,7 +508,7 @@ public class DataStorage extends Storage { bpsStorage.remove(bpRoot.getAbsoluteFile()); } - it.remove(); + this.storageDirs.remove(sd); try { sd.unlock(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9f13968/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 96d1a28..0401a81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -47,7 +47,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.junit.After; +import org.junit.Assert; import org.junit.Test; import java.io.File; @@ -61,9 +63,11 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -91,6 +95,7 @@ public class TestDataNodeHotSwapVolumes { private static final Log LOG = LogFactory.getLog( TestDataNodeHotSwapVolumes.class); private static final int BLOCK_SIZE = 512; + private static final int DEFAULT_STORAGES_PER_DATANODE = 2; private MiniDFSCluster cluster; @After @@ -100,6 +105,11 @@ public class TestDataNodeHotSwapVolumes { private void startDFSCluster(int numNameNodes, int numDataNodes) throws IOException { + startDFSCluster(numNameNodes, numDataNodes, DEFAULT_STORAGES_PER_DATANODE); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode) throws IOException { shutdown(); Configuration conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -123,6 +133,7 @@ public class TestDataNodeHotSwapVolumes { cluster = new MiniDFSCluster.Builder(conf) .nnTopology(nnTopology) .numDataNodes(numDataNodes) + .storagesPerDatanode(storagePerDataNode) .build(); cluster.waitActive(); } @@ -281,7 +292,12 @@ public class TestDataNodeHotSwapVolumes { /** Add volumes to the first DataNode. */ private void addVolumes(int numNewVolumes) - throws ReconfigurationException, IOException { + throws InterruptedException, IOException, ReconfigurationException { + addVolumes(numNewVolumes, new CountDownLatch(0)); + } + + private void addVolumes(int numNewVolumes, CountDownLatch waitLatch) + throws ReconfigurationException, IOException, InterruptedException { File dataDir = new File(cluster.getDataDirectory()); DataNode dn = cluster.getDataNodes().get(0); // First DataNode. Configuration conf = dn.getConf(); @@ -313,6 +329,9 @@ public class TestDataNodeHotSwapVolumes { dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir), is(conf.get(DFS_DATANODE_DATA_DIR_KEY))); + // Await on the latch for needed operations to complete + waitLatch.await(); + // Verify the configuration value is appropriately set. String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(","); String[] expectDataDirs = newDataDir.split(","); @@ -400,23 +419,34 @@ public class TestDataNodeHotSwapVolumes { throws IOException, InterruptedException, TimeoutException, ReconfigurationException { startDFSCluster(1, 1); + int numVolumes = cluster.getStoragesPerDatanode(); String bpid = cluster.getNamesystem().getBlockPoolId(); Path testFile = new Path("/test"); - createFile(testFile, 4); // Each volume has 2 blocks. - addVolumes(2); + // Each volume has 2 blocks + int initialBlockCount = numVolumes * 2; + createFile(testFile, initialBlockCount); + + int newVolumeCount = 5; + addVolumes(newVolumeCount); + numVolumes += newVolumeCount; + + int additionalBlockCount = 9; + int totalBlockCount = initialBlockCount + additionalBlockCount; // Continue to write the same file, thus the new volumes will have blocks. - DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, BLOCK_SIZE * 8); - verifyFileLength(cluster.getFileSystem(), testFile, 8 + 4); - // After appending data, there should be [2, 2, 4, 4] blocks in each volume - // respectively. - List<Integer> expectedNumBlocks = Arrays.asList(2, 2, 4, 4); + DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, + BLOCK_SIZE * additionalBlockCount); + verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); + + // After appending data, each new volume added should + // have 1 block each. + List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4); List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = cluster.getAllBlockReports(bpid); assertEquals(1, blockReports.size()); // 1 DataNode - assertEquals(4, blockReports.get(0).size()); // 4 volumes + assertEquals(numVolumes, blockReports.get(0).size()); // 7 volumes Map<DatanodeStorage, BlockListAsLongs> dnReport = blockReports.get(0); List<Integer> actualNumBlocks = new ArrayList<Integer>(); @@ -427,6 +457,110 @@ public class TestDataNodeHotSwapVolumes { assertEquals(expectedNumBlocks, actualNumBlocks); } + @Test(timeout=180000) + public void testAddVolumesConcurrently() + throws IOException, InterruptedException, TimeoutException, + ReconfigurationException { + startDFSCluster(1, 1, 10); + int numVolumes = cluster.getStoragesPerDatanode(); + String blockPoolId = cluster.getNamesystem().getBlockPoolId(); + Path testFile = new Path("/test"); + + // Each volume has 2 blocks + int initialBlockCount = numVolumes * 2; + createFile(testFile, initialBlockCount); + + DataNode dn = cluster.getDataNodes().get(0); + final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data; + dn.data = Mockito.spy(data); + + final int newVolumeCount = 40; + List<Thread> addVolumeDelayedThreads = new ArrayList<>(); + AtomicBoolean addVolumeError = new AtomicBoolean(false); + AtomicBoolean listStorageError = new AtomicBoolean(false); + CountDownLatch addVolumeCompletionLatch = + new CountDownLatch(newVolumeCount); + + // Thread to list all storage available at DataNode, + // when the volumes are being added in parallel. + final Thread listStorageThread = new Thread(new Runnable() { + @Override + public void run() { + while (addVolumeCompletionLatch.getCount() != newVolumeCount) { + int i = 0; + while(i++ < 1000) { + try { + dn.getStorage().listStorageDirectories(); + } catch (Exception e) { + listStorageError.set(true); + LOG.error("Error listing storage: " + e); + } + } + } + } + }); + listStorageThread.start(); + + // FsDatasetImpl addVolume mocked to perform the operation asynchronously + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + final Random r = new Random(); + Thread addVolThread = + new Thread(new Runnable() { + @Override + public void run() { + try { + r.setSeed(Time.now()); + // Let 50% of add volume operations + // start after an initial delay. + if (r.nextInt(10) > 4) { + int s = r.nextInt(10) + 1; + Thread.sleep(s * 100); + } + invocationOnMock.callRealMethod(); + } catch (Throwable throwable) { + addVolumeError.set(true); + LOG.error("Error adding volume: " + throwable); + } finally { + addVolumeCompletionLatch.countDown(); + } + } + }); + addVolumeDelayedThreads.add(addVolThread); + addVolThread.start(); + return null; + } + }).when(dn.data).addVolume(any(StorageLocation.class), any(List.class)); + + addVolumes(newVolumeCount, addVolumeCompletionLatch); + numVolumes += newVolumeCount; + + // Wait for all addVolume and listStorage Threads to complete + for (Thread t : addVolumeDelayedThreads) { + t.join(); + } + listStorageThread.join(); + + // Verify errors while adding volumes and listing storage directories + Assert.assertEquals("Error adding volumes!", false, addVolumeError.get()); + Assert.assertEquals("Error listing storage!", + false, listStorageError.get()); + + int additionalBlockCount = 9; + int totalBlockCount = initialBlockCount + additionalBlockCount; + + // Continue to write the same file, thus the new volumes will have blocks. + DFSTestUtil.appendFile(cluster.getFileSystem(), testFile, + BLOCK_SIZE * additionalBlockCount); + verifyFileLength(cluster.getFileSystem(), testFile, totalBlockCount); + + List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = + cluster.getAllBlockReports(blockPoolId); + assertEquals(1, blockReports.size()); + assertEquals(numVolumes, blockReports.get(0).size()); + } + @Test(timeout=60000) public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, @@ -780,7 +914,7 @@ public class TestDataNodeHotSwapVolumes { } /** - * Verify that {@link DataNode#checkDiskErrors()} removes all metadata in + * Verify that {@link DataNode#checkDiskError()} removes all metadata in * DataNode upon a volume failure. Thus we can run reconfig on the same * configuration to reload the new volume on the same directory as the failed one. */ --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org