Author: jitendra Date: Wed Sep 14 21:40:32 2011 New Revision: 1170859 URL: http://svn.apache.org/viewvc?rev=1170859&view=rev Log: HDFS-1779. After NameNode restart , Clients can not read partial files even after client invokes Sync. Contributed by Uma Maheswara Rao G.
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestBBWBlockReport.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Wed Sep 14 21:40:32 2011 @@ -96,6 +96,9 @@ Release 0.20-append - Unreleased HDFS-1554. New semantics for recoverLease. (hairong) + HDFS-1779. After NameNode restart , Clients can not read partial files even + after client invokes Sync. (Uma Maheswara Rao G via jitendra) + Release 0.20.3 - Unreleased NEW FEATURES Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Sep 14 21:40:32 2011 @@ -186,6 +186,7 @@ public class DataNode extends Configured int socketWriteTimeout = 0; boolean transferToAllowed = true; int writePacketSize = 0; + private boolean supportAppends; public DataBlockScanner blockScanner = null; public Daemon blockScannerThread = null; @@ -211,7 +212,7 @@ public class DataNode extends Configured AbstractList<File> dataDirs) throws IOException { super(conf); datanodeObject = this; - + supportAppends = conf.getBoolean("dfs.support.append", false); try { startDataNode(conf, dataDirs); } catch (IOException ie) { @@ -553,6 +554,12 @@ public class DataNode extends Configured + ". Expecting " + storage.getStorageID()); } + if (supportAppends) { + Block[] bbwReport = data.getBlocksBeingWrittenReport(); + long[] blocksBeingWritten = BlockListAsLongs + .convertToArrayLongs(bbwReport); + namenode.blocksBeingWrittenReport(dnRegistration, blocksBeingWritten); + } // random short delay - helps scatter the BR from all DNs scheduleBlockReport(initialBlockReportDelay); } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Sep 14 21:40:32 2011 @@ -58,6 +58,33 @@ import org.mortbay.log.Log; * ***************************************************/ public class FSDataset implements FSConstants, FSDatasetInterface { + + + /** Find the metadata file for the specified block file. + * Return the generation stamp from the name of the metafile. + */ + private static long getGenerationStampFromFile(File[] listdir, File blockFile) { + String blockName = blockFile.getName(); + for (int j = 0; j < listdir.length; j++) { + String path = listdir[j].getName(); + if (!path.startsWith(blockName)) { + continue; + } + String[] vals = path.split("_"); + if (vals.length != 3) { // blk, blkid, genstamp.meta + continue; + } + String[] str = vals[2].split("\\."); + if (str.length != 2) { + continue; + } + return Long.parseLong(str[0]); + } + DataNode.LOG.warn("Block " + blockFile + + " does not have a metafile!"); + return Block.GRANDFATHER_GENERATION_STAMP; + } + /** * A data structure than encapsulates a Block along with the full pathname @@ -182,31 +209,6 @@ public class FSDataset implements FSCons return children[ lastChildIdx ].addBlock(b, src, true, false); } - /** Find the metadata file for the specified block file. - * Return the generation stamp from the name of the metafile. - */ - long getGenerationStampFromFile(File[] listdir, File blockFile) { - String blockName = blockFile.getName(); - for (int j = 0; j < listdir.length; j++) { - String path = listdir[j].getName(); - if (!path.startsWith(blockName)) { - continue; - } - String[] vals = path.split("_"); - if (vals.length != 3) { // blk, blkid, genstamp.meta - continue; - } - String[] str = vals[2].split("\\."); - if (str.length != 2) { - continue; - } - return Long.parseLong(str[0]); - } - DataNode.LOG.warn("Block " + blockFile + - " does not have a metafile!"); - return Block.GRANDFATHER_GENERATION_STAMP; - } - /** * Populate the given blockSet with any child blocks * found at this node. @@ -221,7 +223,7 @@ public class FSDataset implements FSCons File blockFiles[] = dir.listFiles(); for (int i = 0; i < blockFiles.length; i++) { if (Block.isBlockFilename(blockFiles[i])) { - long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]); + long genStamp = FSDataset.getGenerationStampFromFile(blockFiles, blockFiles[i]); blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp)); } } @@ -242,7 +244,7 @@ public class FSDataset implements FSCons File blockFiles[] = dir.listFiles(); for (int i = 0; i < blockFiles.length; i++) { if (Block.isBlockFilename(blockFiles[i])) { - long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]); + long genStamp = FSDataset.getGenerationStampFromFile(blockFiles, blockFiles[i]); Block block = new Block(blockFiles[i], blockFiles[i].length(), genStamp); blockSet.add(new BlockAndFile(blockFiles[i].getAbsoluteFile(), block)); } @@ -259,7 +261,7 @@ public class FSDataset implements FSCons File blockFiles[] = dir.listFiles(); for (int i = 0; i < blockFiles.length; i++) { if (Block.isBlockFilename(blockFiles[i])) { - long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]); + long genStamp = FSDataset.getGenerationStampFromFile(blockFiles, blockFiles[i]); volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), new DatanodeBlockInfo(volume, blockFiles[i])); } @@ -375,7 +377,7 @@ public class FSDataset implements FSCons // should not be deleted. blocksBeingWritten = new File(parent, "blocksBeingWritten"); if (blocksBeingWritten.exists()) { - if (supportAppends) { + if (supportAppends) { recoverBlocksBeingWritten(blocksBeingWritten); } else { FileUtil.fullyDelete(blocksBeingWritten); @@ -503,6 +505,35 @@ public class FSDataset implements FSCons void getBlockInfo(TreeSet<Block> blockSet) { dataDir.getBlockInfo(blockSet); } + + void getBlocksBeingWrittenInfo(TreeSet<Block> blockSet) { + if (blocksBeingWritten == null) { + return; + } + + File[] blockFiles = blocksBeingWritten.listFiles(); + if (blockFiles == null) { + return; + } + + for (int i = 0; i < blockFiles.length; i++) { + if (!blockFiles[i].isDirectory()) { + // get each block in the blocksBeingWritten direcotry + if (Block.isBlockFilename(blockFiles[i])) { + long genStamp = + FSDataset.getGenerationStampFromFile(blockFiles, blockFiles[i]); + Block block = + new Block(blockFiles[i], blockFiles[i].length(), genStamp); + + // add this block to block set + blockSet.add(block); + if (DataNode.LOG.isDebugEnabled()) { + DataNode.LOG.debug("recoverBlocksBeingWritten for block " + block); + } + } + } + } + } void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) { dataDir.getVolumeMap(volumeMap, this); @@ -534,8 +565,6 @@ public class FSDataset implements FSCons if (DataNode.LOG.isDebugEnabled()) { DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block); } - DataNode.getDataNode().notifyNamenodeReceivedBlock(b.block, - DataNode.EMPTY_DEL_HINT); } } @@ -647,6 +676,18 @@ public class FSDataset implements FSCons volumes[idx].getVolumeMap(volumeMap); } } + + synchronized void getBlocksBeingWrittenInfo(TreeSet<Block> blockSet) { + long startTime = System.currentTimeMillis(); + + for (int idx = 0; idx < volumes.length; idx++) { + volumes[idx].getBlocksBeingWrittenInfo(blockSet); + } + + long scanTime = (System.currentTimeMillis() - startTime)/1000; + DataNode.LOG.info("Finished generating blocks being written report for " + + volumes.length + " volumes in " + scanTime + " seconds"); + } /** * goes over all the volumes and checkDir eachone of them @@ -1470,6 +1511,20 @@ public class FSDataset implements FSCons } /** + * Return a table of blocks being written data + */ + public Block[] getBlocksBeingWrittenReport() { + TreeSet<Block> blockSet = new TreeSet<Block>(); + volumes.getBlocksBeingWrittenInfo(blockSet); + Block blockTable[] = new Block[blockSet.size()]; + int i = 0; + for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) { + blockTable[i] = it.next(); + } + return blockTable; + } + + /** * Return a table of block data */ public Block[] getBlockReport() { Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Wed Sep 14 21:40:32 2011 @@ -233,6 +233,12 @@ public interface FSDatasetInterface exte * @return - the block report - the full list of blocks stored */ public Block[] getBlockReport(); + + /** + * Returns the blocks being written report + * @return - the blocks being written report + */ + public Block[] getBlocksBeingWrittenReport(); /** * Is the block valid? Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Sep 14 21:40:32 2011 @@ -3018,7 +3018,89 @@ public class FSNamesystem implements FSC allAlive = !foundDead; } } - + + /** + * Log a rejection of an addStoredBlock RPC, invalidate the reported block, + * and return it. + */ + private Block rejectAddStoredBlock(Block block, DatanodeDescriptor node, + String msg) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: " + + "addStoredBlock request received for " + block + " on " + + node.getName() + " size " + block.getNumBytes() + + " but was rejected: " + msg); + addToInvalidates(block, node); + return block; + } + + /** + * It will update the targets for INodeFileUnderConstruction + * + * @param nodeID + * - DataNode ID + * @param blocksBeingWritten + * - list of blocks which are still inprogress. + * @throws IOException + */ + public synchronized void processBlocksBeingWrittenReport(DatanodeID nodeID, + BlockListAsLongs blocksBeingWritten) throws IOException { + DatanodeDescriptor dataNode = getDatanode(nodeID); + if (dataNode == null) { + throw new IOException("ProcessReport from unregistered node: " + + nodeID.getName()); + } + + // Check if this datanode should actually be shutdown instead. + if (shouldNodeShutdown(dataNode)) { + setDatanodeDead(dataNode); + throw new DisallowedDatanodeException(dataNode); + } + + Block block = new Block(); + + for (int i = 0; i < blocksBeingWritten.getNumberOfBlocks(); i++) { + block.set(blocksBeingWritten.getBlockId(i), blocksBeingWritten + .getBlockLen(i), blocksBeingWritten.getBlockGenStamp(i)); + + BlockInfo storedBlock = blocksMap.getStoredBlockWithoutMatchingGS(block); + + if (storedBlock == null) { + rejectAddStoredBlock(new Block(block), dataNode, + "Block not in blockMap with any generation stamp"); + continue; + } + + INodeFile inode = storedBlock.getINode(); + if (inode == null) { + rejectAddStoredBlock(new Block(block), dataNode, + "Block does not correspond to any file"); + continue; + } + + boolean underConstruction = inode.isUnderConstruction(); + boolean isLastBlock = inode.getLastBlock() != null + && inode.getLastBlock().getBlockId() == block.getBlockId(); + + // Must be the last block of a file under construction, + if (!underConstruction) { + rejectAddStoredBlock(new Block(block), dataNode, + "Reported as block being written but is a block of closed file."); + continue; + } + + if (!isLastBlock) { + rejectAddStoredBlock(new Block(block), dataNode, + "Reported as block being written but not the last block of " + + "an under-construction file."); + continue; + } + + INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) inode; + pendingFile.addTarget(dataNode); + incrementSafeBlockCount(pendingFile.getTargets().length); + } + } + /** * The given node is reporting all its blocks. Use this info to * update the (machine-->blocklist) and (block-->machinelist) tables. Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Sep 14 21:40:32 2011 @@ -191,4 +191,13 @@ class INodeFile extends INode { blocks, getPermissionStatus(), clientName, clientMachine, clientNode); } + + /** + * Return the last block in this file, or null if there are no blocks. + */ + Block getLastBlock() { + if (this.blocks == null || this.blocks.length == 0) + return null; + return this.blocks[this.blocks.length - 1]; + } } Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Sep 14 21:40:32 2011 @@ -745,6 +745,21 @@ public class NameNode implements ClientP return DatanodeCommand.FINALIZE; return null; } + + /** + * add new replica blocks to the Inode to target mapping + * also add the Inode file to DataNodeDesc + */ + public void blocksBeingWrittenReport(DatanodeRegistration nodeReg, + long[] blocks) throws IOException { + verifyRequest(nodeReg); + BlockListAsLongs blist = new BlockListAsLongs(blocks); + namesystem.processBlocksBeingWrittenReport(nodeReg, blist); + + stateChangeLog.info("*BLOCK* NameNode.blocksBeingWrittenReport: " + +"from "+nodeReg.getName()+" "+blocks.length +" blocks"); + + } public void blockReceived(DatanodeRegistration nodeReg, Block blocks[], Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Wed Sep 14 21:40:32 2011 @@ -100,6 +100,17 @@ public interface DatanodeProtocol extend */ public DatanodeCommand blockReport(DatanodeRegistration registration, long[] blocks) throws IOException; + + /** + * blocksBeingWrittenReport() tells the NameNode about the blocks-being- + * written information + * + * @param registration + * @param blocks + * @throws IOException + */ + public void blocksBeingWrittenReport(DatanodeRegistration registration, + long[] blocks) throws IOException; /** * blockReceived() allows the DataNode to tell the NameNode about Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Sep 14 21:40:32 2011 @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.tools.DFSA import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.security.*; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; /** @@ -617,6 +618,48 @@ public class MiniDFSCluster { numDataNodes--; return dnprop; } + + /** + * Shutdown namenode. + */ + public synchronized void shutdownNameNode() { + if (nameNode != null) { + System.out.println("Shutting down the namenode"); + nameNode.stop(); + nameNode.join(); + nameNode = null; + } + } + + /** + * Restart namenode. + */ + public synchronized void restartNameNode(boolean waitActive) + throws IOException { + shutdownNameNode(); + nameNode = NameNode.createNameNode(new String[] {}, conf); + System.out.println("Restarted the namenode"); + if (waitActive) { + waitClusterUp(); + int failedCount = 0; + while (true) { + try { + waitActive(); + break; + } catch (IOException e) { + failedCount++; + // Cached RPC connection to namenode, if any, is expected to fail once + if (failedCount > 1) { + System.out.println("Tried waitActive() " + failedCount + + " time(s) and failed, giving up. " + + StringUtils.stringifyException(e)); + throw e; + } + } + } + System.out.println("Cluster is active"); + } + } /** * Restart a datanode Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1170859&r1=1170858&r2=1170859&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Sep 14 21:40:32 2011 @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -683,4 +682,9 @@ public class SimulatedFSDataset impleme public boolean hasEnoughResource() { return true; } + + @Override + public Block[] getBlocksBeingWrittenReport() { + return null; + } } Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestBBWBlockReport.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestBBWBlockReport.java?rev=1170859&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestBBWBlockReport.java (added) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/namenode/TestBBWBlockReport.java Wed Sep 14 21:40:32 2011 @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction; +import org.apache.hadoop.io.IOUtils; +import org.junit.Before; +import org.junit.Test; + +public class TestBBWBlockReport { + + private final Path src = new Path(System.getProperty("test.build.data", + "/tmp"), "testfile"); + + private Configuration conf = null; + + private final String fileContent = "PartialBlockReadTest"; + + @Before + public void setUp() { + conf = new Configuration(); + conf.setInt("ipc.client.connection.maxidletime", 1000); + } + + @Test(timeout = 60000) + // timeout is mainly for safe mode + public void testDNShouldSendBBWReportIfAppendOn() throws Exception { + FileSystem fileSystem = null; + FSDataOutputStream outStream = null; + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + try { + fileSystem = cluster.getFileSystem(); + // Keep open stream + outStream = writeFileAndSync(fileSystem, src, fileContent); + // Parameter true will ensure that NN came out of safemode + cluster.restartNameNode(true); + assertEquals( + "Not able to read the synced block content after NameNode restart (with append support)", + fileContent, getFileContentFromDFS(fileSystem)); + } finally { + if (null != fileSystem) + fileSystem.close(); + if (null != outStream) + outStream.close(); + cluster.shutdown(); + } + } + + @Test + public void testDNShouldNotSendBBWReportIfAppendOff() throws Exception { + FileSystem fileSystem = null; + FSDataOutputStream outStream = null; + // disable the append support + conf.setBoolean("dfs.support.append", false); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + try { + fileSystem = cluster.getFileSystem(); + // Keep open stream + outStream = writeFileAndSync(fileSystem, src, fileContent); + cluster.restartNameNode(false); + Thread.sleep(2000); + assertEquals( + "Able to read the synced block content after NameNode restart (without append support", + 0, getFileContentFromDFS(fileSystem).length()); + } finally { + // NN will not come out of safe mode. So exited the safemode forcibly to + // clean the resources. + cluster.getNameNode().setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + if (null != fileSystem) + fileSystem.close(); + if (null != outStream) + outStream.close(); + cluster.shutdown(); + } + } + + private String getFileContentFromDFS(FileSystem fs) throws IOException { + ByteArrayOutputStream bio = new ByteArrayOutputStream(); + IOUtils.copyBytes(fs.open(src), bio, conf, true); + return new String(bio.toByteArray()); + } + + private FSDataOutputStream writeFileAndSync(FileSystem fs, Path src, + String fileContent) throws IOException { + FSDataOutputStream fo = fs.create(src); + fo.writeBytes(fileContent); + fo.sync(); + return fo; + } +}