Author: mc Date: Sun Jul 31 21:54:07 2005 New Revision: 226742 URL: http://svn.apache.org/viewcvs?rev=226742&view=rev Log:
Fix a tiny, but critical bug in NDFSOutputStream. We had a ">=" test where it should have been ">". In certain write() scenarios, this would cause the receiving datanode to drop bytes that were written by the client. Pretty bad! I also fixed a few logging problems. (Too much in some places, failed to throw exception in another.) Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=226742&r1=226741&r2=226742&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java Sun Jul 31 21:54:07 2005 @@ -22,7 +22,7 @@ ************************************/ public interface FSConstants { public static int BLOCK_SIZE = 32 * 1000 * 1000; - //public static int BLOCK_SIZE = 2173; + //public static int BLOCK_SIZE = 19; public static final long WRITE_COMPLETE = 0xcafae11a; Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=226742&r1=226741&r2=226742&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java Sun Jul 31 21:54:07 2005 @@ -282,7 +282,6 @@ ongoingCreates.add(b); reserved += BLOCK_SIZE; f = getTmpFile(b); - try { if (f.exists()) { throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should not be present, but is."); @@ -295,8 +294,10 @@ throw new IOException("Unexpected problem in startBlock() for " + b + ". File " + f + " should be creatable, but is already present."); } } catch (IOException ie) { + System.out.println("Exception! " + ie); ongoingCreates.remove(b); reserved -= BLOCK_SIZE; + throw ie; } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=226742&r1=226741&r2=226742&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Sun Jul 31 21:54:07 2005 @@ -550,7 +550,6 @@ TreeSet creates = new TreeSet(); public Lease(UTF8 holder) { - LOG.info("New lease, holder " + holder); this.holder = holder; renew(); } @@ -710,7 +709,6 @@ sortedLeases.remove(lease); lease.renew(); sortedLeases.add(lease); - LOG.info("Renewed lease " + lease); } } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226742&r1=226741&r2=226742&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Sun Jul 31 21:54:07 2005 @@ -627,6 +627,7 @@ pos += toWrite; off += toWrite; len -= toWrite; + filePos += toWrite; if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) || (pos == BUFFER_SIZE)) { @@ -660,7 +661,7 @@ private synchronized void flushData(int maxPos) throws IOException { int workingPos = Math.min(pos, maxPos); - if (workingPos >= 0) { + if (workingPos > 0) { // // To the blockStream, write length, then bytes // Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=226742&r1=226741&r2=226742&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Sun Jul 31 21:54:07 2005 @@ -71,6 +71,8 @@ ///////////////////////////////////////////////////// // ClientProtocol ///////////////////////////////////////////////////// + /** + */ public LocatedBlock[] open(String src) throws IOException { Object openResults[] = namesystem.open(new UTF8(src)); if (openResults == null) { @@ -86,6 +88,8 @@ } } + /** + */ public LocatedBlock create(String src, String clientName, boolean overwrite) throws IOException { Object results[] = namesystem.startFile(new UTF8(src), new UTF8(clientName), overwrite); if (results == null) { @@ -97,6 +101,8 @@ } } + /** + */ public LocatedBlock addBlock(String src) throws IOException { Object results[] = namesystem.getAdditionalBlock(new UTF8(src)); if (results != null && results[0] == null) { @@ -118,14 +124,20 @@ } } + /** + */ public void abandonBlock(Block b, String src) throws IOException { if (! namesystem.abandonBlock(b, new UTF8(src))) { throw new IOException("Cannot abandon block during write to " + src); } } + /** + */ public void abandonFileInProgress(String src) throws IOException { namesystem.abandonFileInProgress(new UTF8(src)); } + /** + */ public boolean complete(String src, String clientName) throws IOException { int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName)); if (returnCode == STILL_WAITING) { @@ -136,6 +148,8 @@ throw new IOException("Could not complete write to file " + src + " by " + clientName); } } + /** + */ public String[] getHints(String src, long offset) throws IOException { UTF8 hosts[] = namesystem.getDatanodeHints(new UTF8(src), offset); if (hosts == null) { @@ -148,27 +162,38 @@ return results; } } - + /** + */ public boolean rename(String src, String dst) throws IOException { return namesystem.renameTo(new UTF8(src), new UTF8(dst)); } + /** + */ public boolean delete(String src) throws IOException { return namesystem.delete(new UTF8(src)); } + /** + */ public boolean exists(String src) throws IOException { return namesystem.exists(new UTF8(src)); } + /** + */ public boolean isDir(String src) throws IOException { return namesystem.isDir(new UTF8(src)); } + /** + */ public boolean mkdirs(String src) throws IOException { return namesystem.mkdirs(new UTF8(src)); } + /** + */ public boolean obtainLock(String src, String clientName, boolean exclusive) throws IOException { int returnCode = namesystem.obtainLock(new UTF8(src), new UTF8(clientName), exclusive); if (returnCode == COMPLETE_SUCCESS) { @@ -180,6 +205,8 @@ } } + /** + */ public boolean releaseLock(String src, String clientName) throws IOException { int returnCode = namesystem.releaseLock(new UTF8(src), new UTF8(clientName)); if (returnCode == COMPLETE_SUCCESS) { @@ -191,14 +218,26 @@ } } + /** + */ public void renewLease(String clientName) throws IOException { namesystem.renewLease(new UTF8(clientName)); } + /** + */ public NDFSFileInfo[] getListing(String src) throws IOException { + /** + System.out.println("hbCounts: " + hbCounts + ", avgTime: " + (hbTime / (1.0 * hbCounts))); + System.out.println("brCounts: " + brCounts + ", avgTime: " + (brTime / (1.0 * brCounts))); + System.out.println("brvCounts: " + brvCounts + ", avgTime: " + (brvTime / (1.0 * brvCounts))); + System.out.println("bwCounts: " + bwCounts + ", avgTime: " + (bwTime / (1.0 * bwCounts))); + **/ return namesystem.getListing(new UTF8(src)); } + /** + */ public long[] getStats() throws IOException { long results[] = new long[2]; results[0] = namesystem.totalCapacity(); @@ -206,6 +245,8 @@ return results; } + /** + */ public DatanodeInfo[] getDatanodeReport() throws IOException { DatanodeInfo results[] = namesystem.datanodeReport(); if (results == null || results.length == 0) { @@ -214,20 +255,37 @@ return results; } + //////////////////////////////////////////////////////////////// + // DatanodeProtocol + //////////////////////////////////////////////////////////////// + long hbTime = 0, brTime = 0, brvTime = 0, bwTime = 0; + int hbCounts = 0, brCounts = 0, brvCounts = 0, bwCounts = 0; /** */ public void sendHeartbeat(String sender, long capacity, long remaining) { + long start = System.currentTimeMillis(); namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining); + long end = System.currentTimeMillis(); + hbCounts++; + hbTime += (end-start); } public void blockReport(String sender, Block blocks[]) { + long start = System.currentTimeMillis(); namesystem.processReport(blocks, new UTF8(sender)); + long end = System.currentTimeMillis(); + brCounts++; + brTime += (end-start); } public void blockReceived(String sender, Block blocks[]) { + long start = System.currentTimeMillis(); for (int i = 0; i < blocks.length; i++) { namesystem.blockReceived(blocks[i], new UTF8(sender)); } + long end = System.currentTimeMillis(); + brvCounts++; + brvTime += (end-start); } /** @@ -244,23 +302,30 @@ // // Ask to perform pending transfers, if any // - Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress); - if (xferResults != null) { - return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]); - } + long start = System.currentTimeMillis(); + try { + Object xferResults[] = namesystem.pendingTransfers(new DatanodeInfo(new UTF8(sender)), xmitsInProgress); + if (xferResults != null) { + return new BlockCommand((Block[]) xferResults[0], (DatanodeInfo[][]) xferResults[1]); + } - // - // If none, check to see if there are blocks to invalidate - // - Block blocks[] = namesystem.recentlyInvalidBlocks(new UTF8(sender)); - if (blocks == null) { - blocks = namesystem.checkObsoleteBlocks(new UTF8(sender)); - } - if (blocks != null) { - return new BlockCommand(blocks); - } + // + // If none, check to see if there are blocks to invalidate + // + Block blocks[] = namesystem.recentlyInvalidBlocks(new UTF8(sender)); + if (blocks == null) { + blocks = namesystem.checkObsoleteBlocks(new UTF8(sender)); + } + if (blocks != null) { + return new BlockCommand(blocks); + } - return new BlockCommand(); + return new BlockCommand(); + } finally { + long end = System.currentTimeMillis(); + bwCounts++; + bwTime += (end-start); + } } /**