This is an automated email from the ASF dual-hosted git repository. kihwal pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 1710975 HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu. 1710975 is described below commit 17109758dd6f9a86b226c025ee20b8e2abc9d366 Author: Kihwal Lee <kih...@apache.org> AuthorDate: Fri Jun 12 16:19:36 2020 -0500 HDFS-12453. TestDataNodeHotSwapVolumes fails in trunk Jenkins runs. Contributed by Jim Brennan and Lei Xu. --- .../datanode/TestDataNodeHotSwapVolumes.java | 149 ++++++++++++++------- 1 file changed, 97 insertions(+), 52 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 93c1242..e98b90a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.BlockMissingException; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -36,6 +37,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; @@ -46,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.After; @@ -83,6 +87,7 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -777,12 +782,11 @@ public class TestDataNodeHotSwapVolumes { private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) throws IOException, ReconfigurationException, TimeoutException, InterruptedException, BrokenBarrierException { - // Starts DFS cluster with 3 DataNodes to form a pipeline. - startDFSCluster(1, 3); + startDFSCluster(1, 4); final short REPLICATION = 3; - final DataNode dn = cluster.getDataNodes().get(dataNodeIdx); - final FileSystem fs = cluster.getFileSystem(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final DFSClient client = fs.getClient(); final Path testFile = new Path("/test"); FSDataOutputStream out = fs.create(testFile, REPLICATION); @@ -792,54 +796,102 @@ public class TestDataNodeHotSwapVolumes { out.write(writeBuf); out.hflush(); - // Make FsDatasetSpi#finalizeBlock a time-consuming operation. So if the - // BlockReceiver releases volume reference before finalizeBlock(), the blocks - // on the volume will be removed, and finalizeBlock() throws IOE. - final FsDatasetSpi<? extends FsVolumeSpi> data = dn.data; - dn.data = Mockito.spy(data); - doAnswer(new Answer<Object>() { - public Object answer(InvocationOnMock invocation) - throws IOException, InterruptedException { - Thread.sleep(1000); - // Bypass the argument to FsDatasetImpl#finalizeBlock to verify that - // the block is not removed, since the volume reference should not - // be released at this point. - data.finalizeBlock((ExtendedBlock) invocation.getArguments()[0], - (boolean) invocation.getArguments()[1]); - return null; - } - }).when(dn.data).finalizeBlock(any(ExtendedBlock.class), - Mockito.anyBoolean()); - - final CyclicBarrier barrier = new CyclicBarrier(2); + BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0, BLOCK_SIZE); + String[] dataNodeNames = blocks[0].getNames(); + String dataNodeName = dataNodeNames[dataNodeIdx]; + int xferPort = Integer.parseInt(dataNodeName.split(":")[1]); + DataNode dn = null; + for (DataNode dataNode : cluster.getDataNodes()) { + if (dataNode.getXferPort() == xferPort) { + dn = dataNode; + break; + } + } + assertNotNull(dn); - List<String> oldDirs = getDataDirs(dn); - final String newDirs = oldDirs.get(1); // Remove the first volume. - final List<Exception> exceptions = new ArrayList<>(); - Thread reconfigThread = new Thread() { - public void run() { + final CyclicBarrier barrier = new CyclicBarrier(4); + final AtomicBoolean done = new AtomicBoolean(false); + DataNodeFaultInjector newInjector = new DataNodeFaultInjector() { + public void logDelaySendingAckToUpstream( + final String upstreamAddr, final long delayMs) throws IOException { try { - barrier.await(); - assertThat( - "DN did not update its own config", - dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDirs), - is(dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); - } catch (ReconfigurationException | - InterruptedException | - BrokenBarrierException e) { - exceptions.add(e); + // Make all streams which hold the volume references to wait the + // reconfiguration thread to start. + // It should only block IO during the period of reconfiguration + // task running. + if (!done.get()) { + barrier.await(); + // Add delays to allow the reconfiguration thread starts before + // IO finish. + Thread.sleep(1000); + } + } catch (InterruptedException | BrokenBarrierException e) { + throw new IOException(e); } } }; - reconfigThread.start(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); - barrier.await(); - rb.nextBytes(writeBuf); - out.write(writeBuf); - out.hflush(); - out.close(); + try { + DataNodeFaultInjector.set(newInjector); + + List<String> oldDirs = getDataDirs(dn); + LocatedBlocks lbs = client.getLocatedBlocks("/test", 0); + LocatedBlock block = lbs.get(0); + FsVolumeImpl volume = + (FsVolumeImpl) dn.getFSDataset().getVolume(block.getBlock()); + StringBuffer newDirsBuf = new StringBuffer(); + String delim = ""; + for (String d : oldDirs) { + if (! d.contains(volume.getBasePath())) { + newDirsBuf.append(delim).append(d); + delim = ","; + } + } + final String newDirs = newDirsBuf.toString(); + final List<IOException> exceptions = new ArrayList<>(); + final DataNode dataNode = dn; + final CyclicBarrier reconfigBarrier = new CyclicBarrier(2); + + Thread reconfigThread = new Thread(new Runnable() { + @Override + public void run() { + try { + reconfigBarrier.await(); + + // Wake up writing threads on the pipeline to finish the block. + barrier.await(); + + assertThat( + "DN did not update its own config", + dataNode.reconfigurePropertyImpl( + DFS_DATANODE_DATA_DIR_KEY, newDirs), + is(dataNode.getConf().get(DFS_DATANODE_DATA_DIR_KEY))); + done.set(true); + } catch (ReconfigurationException | + InterruptedException | + BrokenBarrierException e) { + exceptions.add(new IOException(e)); + } + } + }); + reconfigThread.start(); - reconfigThread.join(); + // Write more data to make sure the stream threads wait on the barrier. + rb.nextBytes(writeBuf); + out.write(writeBuf); + reconfigBarrier.await(); + out.hflush(); + out.close(); + + reconfigThread.join(); + + if (!exceptions.isEmpty()) { + throw MultipleIOException.createIOException(exceptions); + } + } finally { + DataNodeFaultInjector.set(oldInjector); + } // Verify if the data directory reconfigure was successful FsDatasetSpi<? extends FsVolumeSpi> fsDatasetSpi = dn.getFSDataset(); @@ -852,19 +904,12 @@ public class TestDataNodeHotSwapVolumes { 1, fsVolumeReferences.size()); } - // Add a new DataNode to help with the pipeline recover. - cluster.startDataNodes(conf, 1, true, null, null, null); - // Verify the file has sufficient replications. DFSTestUtil.waitReplication(fs, testFile, REPLICATION); // Read the content back byte[] content = DFSTestUtil.readFileBuffer(fs, testFile); assertEquals(BLOCK_SIZE, content.length); - if (!exceptions.isEmpty()) { - throw new IOException(exceptions.get(0).getCause()); - } - // Write more files to make sure that the DataNode that has removed volume // is still alive to receive data. for (int i = 0; i < 10; i++) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org