Author: mc Date: Fri Sep 9 11:24:00 2005 New Revision: 279840 URL: http://svn.apache.org/viewcvs?rev=279840&view=rev Log:
The NDFS Client will now report its own written blocks to the NameNode; the receiving DataNodes used to do it. This led to all sorts of bad heartbeat race conditions among DataNodes, and the NameNode had difficulty deciding when to replicate partially-reported blocks. No mas! Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=279840&r1=279839&r2=279840&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Fri Sep 9 11:24:00 2005 @@ -45,6 +45,13 @@ public LocatedBlock addBlock(String src) throws IOException; /** + * The client wants to report a block it has just successfully + * written to one or more datanodes. Client-written blocks are + * always reported by the client, not by the datanode. + */ + public void reportWrittenBlock(LocatedBlock b) throws IOException; + + /** * The client wants to abandon writing to the indicated block, * part of the indicated (currently-open) filename. */ Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java?rev=279840&r1=279839&r2=279840&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/DataNode.java Fri Sep 9 11:24:00 2005 @@ -285,6 +285,7 @@ // DataOutputStream reply = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); try { + boolean shouldReportBlock = in.readBoolean(); Block b = new Block(); b.readFields(in); int numTargets = in.readInt(); @@ -302,11 +303,15 @@ // // Make sure curTarget is equal to this machine - // REMIND - mjc // DatanodeInfo curTarget = targets[0]; // + // Track all the places we've successfully written the block + // + Vector mirrors = new Vector(); + + // // Open local disk out // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(data.writeToBlock(b))); @@ -329,6 +334,7 @@ // Write connection header out2.write(OP_WRITE_BLOCK); + out2.writeBoolean(shouldReportBlock); b.write(out2); out2.writeInt(targets.length - 1); for (int i = 1; i < targets.length; i++) { @@ -412,6 +418,12 @@ if (complete != WRITE_COMPLETE) { LOG.info("Conflicting value for WRITE_COMPLETE: " + complete); } + LocatedBlock newLB = new LocatedBlock(); + newLB.readFields(in2); + DatanodeInfo mirrorsSoFar[] = newLB.getLocations(); + for (int k = 0; k < mirrorsSoFar.length; k++) { + mirrors.add(mirrorsSoFar[k]); + } LOG.info("Received block " + b + " from " + s.getInetAddress() + " and mirrored to " + mirrorTarget); } } finally { @@ -432,17 +444,25 @@ // // Tell the namenode that we've received this block - // in full. + // in full, if we've been asked to. This is done + // during NameNode-directed block transfers, but not + // client writes. // - synchronized (receivedBlockList) { - receivedBlockList.add(b); - receivedBlockList.notifyAll(); + if (shouldReportBlock) { + synchronized (receivedBlockList) { + receivedBlockList.add(b); + receivedBlockList.notifyAll(); + } } // - // Tell client job is done + // Tell client job is done, and reply with + // the new LocatedBlock. // reply.writeLong(WRITE_COMPLETE); + mirrors.add(curTarget); + LocatedBlock newLB = new LocatedBlock(b, (DatanodeInfo[]) mirrors.toArray(new DatanodeInfo[mirrors.size()])); + newLB.write(reply); } finally { reply.close(); } @@ -582,6 +602,7 @@ // Header info // out.write(OP_WRITE_BLOCK); + out.writeBoolean(true); b.write(out); out.writeInt(targets.length); for (int i = 0; i < targets.length; i++) { Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=279840&r1=279839&r2=279840&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Fri Sep 9 11:24:00 2005 @@ -293,16 +293,6 @@ // Create next block results[0] = allocateBlock(src); results[1] = targets; - } else { - LOG.info("File progress failure for " + src); - Vector v = (Vector) pendingCreates.get(src); - for (Iterator it = v.iterator(); it.hasNext(); ) { - Block b = (Block) it.next(); - TreeSet containingNodes = (TreeSet) blocksMap.get(b); - if (containingNodes == null || containingNodes.size() < MIN_REPLICATION) { - LOG.info("Problem with block " + b + ", with " + (containingNodes == null ? "0" : "" + containingNodes.size()) + " nodes reporting in."); - } - } } } return results; Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=279840&r1=279839&r2=279840&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Fri Sep 9 11:24:00 2005 @@ -623,6 +623,7 @@ // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_WRITE_BLOCK); + out.writeBoolean(false); block.write(out); out.writeInt(nodes.length); for (int i = 0; i < nodes.length; i++) { @@ -745,6 +746,7 @@ } /** + * We're done writing to the current block. */ private synchronized void endBlock() throws IOException { boolean mustRecover = ! blockStreamWorking; @@ -754,16 +756,7 @@ // if (blockStreamWorking) { try { - blockStream.writeLong(0); - blockStream.flush(); - - long complete = blockReplyStream.readLong(); - if (complete != WRITE_COMPLETE) { - LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); - throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); - } - blockStream.close(); - blockReplyStream.close(); + internalClose(); } catch (IOException ie) { try { blockStream.close(); @@ -799,8 +792,7 @@ blockStream.write(buf, 0, bytesRead); bytesRead = in.read(buf); } - blockStream.writeLong(0); - blockStream.close(); + internalClose(); LOG.info("Recovered from failed datanode connection"); mustRecover = false; } catch (IOException ie) { @@ -823,6 +815,28 @@ backupFile.delete(); backupFile = File.createTempFile("ndfsout", "bak"); backupStream = new BufferedOutputStream(new FileOutputStream(backupFile)); + } + + /** + * Close down stream to remote datanode. Called from two places + * in endBlock(); + */ + private synchronized void internalClose() throws IOException { + blockStream.writeLong(0); + blockStream.flush(); + + long complete = blockReplyStream.readLong(); + if (complete != WRITE_COMPLETE) { + LOG.info("Did not receive WRITE_COMPLETE flag: " + complete); + throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete); + } + + LocatedBlock lb = new LocatedBlock(); + lb.readFields(blockReplyStream); + namenode.reportWrittenBlock(lb); + + blockStream.close(); + blockReplyStream.close(); } /** Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=279840&r1=279839&r2=279840&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Fri Sep 9 11:24:00 2005 @@ -129,6 +129,20 @@ } /** + * The client can report in a set written blocks that it wrote. + * These blocks are reported via the client instead of the datanode + * to prevent weird heartbeat race conditions. + */ + public void reportWrittenBlock(LocatedBlock lb) throws IOException { + Block b = lb.getBlock(); + DatanodeInfo targets[] = lb.getLocations(); + for (int i = 0; i < targets.length; i++) { + namesystem.blockReceived(b, targets[i].getName()); + } + } + + /** + * The client needs to give up on the block. */ public void abandonBlock(Block b, String src) throws IOException { if (! namesystem.abandonBlock(b, new UTF8(src))) {