Author: mc Date: Sun Jul 31 12:54:15 2005 New Revision: 226687 URL: http://svn.apache.org/viewcvs?rev=226687&view=rev Log:
Fix two problems: 1) Revamp the NDFSClientInputStream to be less complicated. We can thus easily add a read(byte b[]) method. 2) Add a NameNode.abandonFileInProgress() method. The client calls this if there's a problem during file-create. This will eliminate the "pendingCreates non-null" exception that is sometimes seen. (The problem is when the client cannot connect to a datanode for the first block in the file. We should abandon the entire file, not just the block as we were doing previously.) Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java?rev=226687&r1=226686&r2=226687&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/ClientProtocol.java Sun Jul 31 12:54:15 2005 @@ -51,6 +51,12 @@ public void abandonBlock(Block b, String src) throws IOException; /** + * The client wants to abandon writing to the current file, and + * let anyone else grab it. + */ + public void abandonFileInProgress(String src) throws IOException; + + /** * The client is done writing data to the given filename, and would * like to complete it. Returns whether the file has been closed * correctly (true) or whether caller should try again (false). Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=226687&r1=226686&r2=226687&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java Sun Jul 31 12:54:15 2005 @@ -317,6 +317,13 @@ } /** + * Abandon the entire file in progress + */ + public synchronized void abandonFileInProgress(UTF8 src) throws IOException { + internalReleaseCreate(src); + } + + /** * Finalize the created file and make it world-accessible. The * FSNamesystem will already know the blocks that make up the file. * Before we return, we make sure that all the file's blocks have @@ -578,7 +585,10 @@ internalReleaseLock(src, holder); } locks.clear(); - internalReleaseCreates(creates); + for (Iterator it = creates.iterator(); it.hasNext(); ) { + UTF8 src = (UTF8) it.next(); + internalReleaseCreate(src); + } creates.clear(); } @@ -682,14 +692,11 @@ private int internalReleaseLock(UTF8 src, UTF8 holder) { return dir.releaseLock(src, holder); } - private void internalReleaseCreates(TreeSet creates) { - for (Iterator it = creates.iterator(); it.hasNext(); ) { - UTF8 src = (UTF8) it.next(); - Vector v = (Vector) pendingCreates.remove(src); - for (Iterator it2 = v.iterator(); it2.hasNext(); ) { - Block b = (Block) it2.next(); - pendingCreateBlocks.remove(b); - } + private void internalReleaseCreate(UTF8 src) { + Vector v = (Vector) pendingCreates.remove(src); + for (Iterator it2 = v.iterator(); it2.hasNext(); ) { + Block b = (Block) it2.next(); + pendingCreateBlocks.remove(b); } } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226687&r1=226686&r2=226687&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java Sun Jul 31 12:54:15 2005 @@ -236,16 +236,10 @@ private DataInputStream blockStream; private DataOutputStream partnerStream; private Block blocks[]; - private int curBlock = 0; private DatanodeInfo nodes[][]; private long pos = 0; - private long bytesRemainingInBlock = 0, curBlockSize = 0; - - private int memoryBuf[] = new int[32 * 1024]; - private long memoryStartPos = 0; - private long openPoint = 0; - private int memoryBytes = 0; - private int memoryBytesStart = 0; + private long filelen = 0; + private long blockEnd = -1; /** */ @@ -254,23 +248,19 @@ this.nodes = nodes; this.blockStream = null; this.partnerStream = null; + for (int i = 0; i < blocks.length; i++) { + this.filelen += blocks[i].getNumBytes(); + } } /** - * Open a DataInputStream to a DataNode so that it can be written to. - * This happens when a file is created and each time a new block is allocated. - * Must get block ID and the IDs of the destinations from the namenode. + * Open a DataInputStream to a DataNode so that it can be read from. + * We get block ID and the IDs of the destinations at startup, from the namenode. */ - private synchronized void nextBlockInputStream() throws IOException { - nextBlockInputStream(0); - } - private synchronized void nextBlockInputStream(long preSkip) throws IOException { - if (curBlock >= blocks.length) { + private synchronized void blockSeekTo(long target) throws IOException { + if (target >= filelen) { throw new IOException("Attempted to read past end of file"); } - if (bytesRemainingInBlock > 0) { - throw new IOException("Trying to skip to next block without reading all data"); - } if (blockStream != null) { blockStream.close(); @@ -278,17 +268,39 @@ } // - // Connect to best DataNode for current Block + // Compute desired block + // + int targetBlock = -1; + long targetBlockStart = 0; + long targetBlockEnd = 0; + for (int i = 0; i < blocks.length; i++) { + long blocklen = blocks[i].getNumBytes(); + targetBlockEnd = targetBlockStart + blocklen - 1; + + if (target >= targetBlockStart && target <= targetBlockEnd) { + targetBlock = i; + break; + } else { + targetBlockStart = targetBlockEnd + 1; + } + } + if (targetBlock < 0) { + throw new IOException("Impossible situation: could not find target position " + target); + } + long offsetIntoBlock = target - targetBlockStart; + + // + // Connect to best DataNode for desired Block, with potential offset // - InetSocketAddress target = null; + InetSocketAddress targetAddr = null; Socket s = null; TreeSet deadNodes = new TreeSet(); while (s == null) { DatanodeInfo chosenNode; try { - chosenNode = bestNode(nodes[curBlock], deadNodes); - target = DataNode.createSocketAddr(chosenNode.getName().toString()); + chosenNode = bestNode(nodes[targetBlock], deadNodes); + targetAddr = DataNode.createSocketAddr(chosenNode.getName().toString()); } catch (IOException ie) { LOG.info("Could not obtain block from any node. Retrying..."); try { @@ -299,37 +311,34 @@ continue; } try { - s = new Socket(target.getAddress(), target.getPort()); - //LOG.info("Now downloading from " + target + ", block " + blocks[curBlock] + ", skipahead " + preSkip); + s = new Socket(targetAddr.getAddress(), targetAddr.getPort()); // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream())); out.write(OP_READSKIP_BLOCK); - blocks[curBlock].write(out); - out.writeLong(preSkip); + blocks[targetBlock].write(out); + out.writeLong(offsetIntoBlock); out.flush(); // // Get bytes in block, set streams // DataInputStream in = new DataInputStream(new BufferedInputStream(s.getInputStream())); - curBlockSize = in.readLong(); + long curBlockSize = in.readLong(); long amtSkipped = in.readLong(); - - pos += amtSkipped; - bytesRemainingInBlock = curBlockSize - amtSkipped; - - if (amtSkipped > 0) { - memoryStartPos = pos; - memoryBytes = 0; - memoryBytesStart = 0; + if (curBlockSize != blocks[targetBlock].len) { + throw new IOException("Recorded block size is " + blocks[targetBlock].len + ", but datanode reports size of " + curBlockSize); + } + if (amtSkipped != offsetIntoBlock) { + throw new IOException("Asked for offset of " + offsetIntoBlock + ", but only received offset of " + amtSkipped); } - blockStream = in; - partnerStream = out; - curBlock++; - openPoint = pos; + + this.pos = target; + this.blockEnd = targetBlockEnd; + this.blockStream = in; + this.partnerStream = out; } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Could not connect to " + target); @@ -340,242 +349,110 @@ } /** + * Close it down! */ - public synchronized void seek(long pos) throws IOException { - if (pos < 0) { - throw new IOException("Cannot seek to negative position " + pos); + public synchronized void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); } - if (pos == this.pos) { - return; + + if (blockStream != null) { + blockStream.close(); + blockStream = null; + partnerStream.close(); } + super.close(); + closed = true; + } - // - // If we have remembered enough bytes to seek backwards to the - // desired pos, we can do so easily - // - if ((pos >= memoryStartPos) && (memoryStartPos + memoryBytes > pos)) { - this.pos = pos; - } else { - // - // If we are seeking backwards (and *don't* have enough memory bytes) - // we need to reset the NDFS streams. They will be reopened upon the - // next call to nextBlockInputStream(). After this operation, all - // seeks will be "forwardSeeks". - // - if (pos < memoryStartPos && blockStream != null) { - blockStream.close(); - blockStream = null; - partnerStream.close(); - partnerStream = null; - this.curBlock = 0; - this.bytesRemainingInBlock = 0; - this.pos = 0; - this.memoryStartPos = 0; - this.memoryBytes = 0; - this.memoryBytesStart = 0; - // - // REMIND - this could be made more efficient, to just - // skip back block-by-block - // + /** + * Basic read() + */ + public synchronized int read() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + int result = -1; + if (pos < filelen) { + if (pos > blockEnd) { + blockSeekTo(pos); } - - // - // Now read ahead to the desired position. - // - long diff = pos - this.pos; - while (diff > 0) { - long skipped = skip(diff); - if (skipped > 0) { - diff -= skipped; - } + result = blockStream.read(); + if (result >= 0) { + pos++; } - // Pos will be incremented by skip() } + return result; } /** - * Skip ahead some number of bytes + * Read the entire buffer. */ - public synchronized long skip(long skip) throws IOException { - long toSkip = 0; - long toFastSkip = 0; - if (skip > memoryBuf.length) { - toSkip = memoryBuf.length; - toFastSkip = skip - toSkip; - } else { - toSkip = skip; - } - long totalSkipped = 0; - - // - // If there's a lot of fast-skipping to do within the current block, - // close it and reopen, so we can fast-skip to the target - // - /** - while (toFastSkip > 0) { - long amtSkipped = super.skip(toFastSkip); - toFastSkip -= amtSkipped; - totalSkipped += amtSkipped; - } - **/ - long realBytesRemaining = bytesRemainingInBlock + (memoryBytes - (pos - memoryStartPos)); - if (toFastSkip > 0 && realBytesRemaining > 0 && - toFastSkip < realBytesRemaining) { - - blockStream.close(); - blockStream = null; - partnerStream.close(); - partnerStream = null; - - long backwardsDistance = curBlockSize - realBytesRemaining; - pos -= backwardsDistance; - totalSkipped -= backwardsDistance; - toFastSkip += backwardsDistance; - bytesRemainingInBlock = 0; - curBlock--; - - memoryStartPos = pos; - memoryBytes = 0; - memoryBytesStart = 0; + public synchronized int read(byte buf[], int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); } - - // - // If there's any fast-skipping to do, we do it by opening a - // new block and telling the datanode how many bytes to skip. - // - while (toFastSkip > 0 && curBlock < blocks.length) { - - if (bytesRemainingInBlock > 0) { - blockStream.close(); - blockStream = null; - partnerStream.close(); - partnerStream = null; - - pos += bytesRemainingInBlock; - totalSkipped += bytesRemainingInBlock; - toFastSkip -= bytesRemainingInBlock; - bytesRemainingInBlock = 0; + if (pos < filelen) { + if (pos > blockEnd) { + blockSeekTo(pos); } - - long oldPos = pos; - nextBlockInputStream(toFastSkip); - long forwardDistance = (pos - oldPos); - totalSkipped += forwardDistance; - toFastSkip -= (pos - oldPos); - - memoryStartPos = pos; - memoryBytes = 0; - memoryBytesStart = 0; - } - - // - // If there's any remaining toFastSkip, well, there's - // not much we can do about it. We're at the end of - // the stream! - // - if (toFastSkip > 0) { - System.err.println("Trying to skip past end of file...."); - toFastSkip = 0; + int result = blockStream.read(buf, off, len); + if (result >= 0) { + pos += result; + } + return result; } - - // - // Do a slow skip as we approach, so we can fill the client - // history buffer - // - totalSkipped += super.skip(toSkip); - toSkip = 0; - return totalSkipped; + return -1; } /** + * Seek to a new arbitrary location */ - public synchronized long getPos() throws IOException { - return pos; + public synchronized void seek(long targetPos) throws IOException { + if (targetPos >= filelen) { + throw new IOException("Cannot seek after EOF"); + } + if (targetPos >= pos && targetPos <= blockEnd) { + skip(targetPos - pos); + } else { + pos = targetPos; + blockEnd = -1; + } } /** + * Skip ahead some number of bytes */ - public synchronized int available() throws IOException { - if (closed) { - throw new IOException("Stream closed"); + public synchronized long skip(long skip) throws IOException { + if (skip > 0) { + long targetPos = pos + skip; + targetPos = Math.min(targetPos, filelen); + + if (targetPos <= blockEnd) { + return blockStream.skip(skip); + } else { + pos = targetPos; + blockEnd = -1; + return skip; + } + } else { + return 0; } - return (int) Math.min((long) Integer.MAX_VALUE, bytesRemainingInBlock); } /** */ - public synchronized void close() throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - - if (blockStream != null) { - blockStream.close(); - blockStream = null; - partnerStream.close(); - } - super.close(); - closed = true; + public synchronized long getPos() throws IOException { + return pos; } /** - * Other read() functions are implemented in terms of - * this one. */ - public synchronized int read() throws IOException { + public synchronized int available() throws IOException { if (closed) { throw new IOException("Stream closed"); } - - int b = 0; - if (pos - memoryStartPos < memoryBytes) { - // - // Move the memoryStartPos up to current pos, if necessary. - // - int diff = (int) (pos - memoryStartPos); - - // - // Fetch the byte - // - b = memoryBuf[(memoryBytesStart + diff) % memoryBuf.length]; - - // - // Bump the pos - // - pos++; - } else { - if (bytesRemainingInBlock == 0) { - if (curBlock < blocks.length) { - nextBlockInputStream(); - } else { - return -1; - } - } - b = blockStream.read(); - if (b >= 0) { - // - // Remember byte so we can seek backwards at some later time - // - if (memoryBytes == memoryBuf.length) { - memoryStartPos++; - } - - if (memoryBuf.length > 0) { - int target; - if (memoryBytes == memoryBuf.length) { - target = memoryBytesStart; - memoryBytesStart = (memoryBytesStart + 1) % memoryBuf.length; - } else { - target = (memoryBytesStart + memoryBytes) % memoryBuf.length; - memoryBytes++; - } - memoryBuf[target] = b; - } - bytesRemainingInBlock--; - pos++; - } - } - return b; + return (int) (filelen - pos); } /** @@ -661,7 +538,6 @@ } } catch (InterruptedException ie) { } - } else { blockComplete = true; } @@ -676,7 +552,6 @@ InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString()); Socket s = null; try { - //System.err.println("Trying to connect to " + target); s = new Socket(target.getAddress(), target.getPort()); } catch (IOException ie) { // Connection failed. Let's wait a little bit and retry @@ -687,7 +562,11 @@ Thread.sleep(6000); } catch (InterruptedException iex) { } - namenode.abandonBlock(block, src.toString()); + if (firstTime) { + namenode.abandonFileInProgress(src.toString()); + } else { + namenode.abandonBlock(block, src.toString()); + } retry = true; continue; } Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=226687&r1=226686&r2=226687&view=diff ============================================================================== --- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java (original) +++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java Sun Jul 31 12:54:15 2005 @@ -123,6 +123,9 @@ throw new IOException("Cannot abandon block during write to " + src); } } + public void abandonFileInProgress(String src) throws IOException { + namesystem.abandonFileInProgress(new UTF8(src)); + } public boolean complete(String src, String clientName) throws IOException { int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName)); if (returnCode == STILL_WAITING) {