Author: mc
Date: Sun Jul 31 21:54:07 2005
New Revision: 226742

URL: http://svn.apache.org/viewcvs?rev=226742&view=rev
Log:

  Fix a tiny, but critical bug in NDFSOutputStream.
We had a ">=" test where it should have been ">".
In certain write() scenarios, this would cause the
receiving datanode to drop bytes that were written by
the client.  Pretty bad!

  I also fixed a few logging problems.  (Too much in some
places, failed to throw exception in another.)


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
    
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java?rev=226742&r1=226741&r2=226742&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSConstants.java 
Sun Jul 31 21:54:07 2005
@@ -22,7 +22,7 @@
  ************************************/
 public interface FSConstants {
     public static int BLOCK_SIZE = 32 * 1000 * 1000;
-    //public static int BLOCK_SIZE = 2173;
+    //public static int BLOCK_SIZE = 19;
 
     public static final long WRITE_COMPLETE = 0xcafae11a;
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java?rev=226742&r1=226741&r2=226742&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSDataset.java 
Sun Jul 31 21:54:07 2005
@@ -282,7 +282,6 @@
             ongoingCreates.add(b);
             reserved += BLOCK_SIZE;
             f = getTmpFile(b);
-
            try {
                if (f.exists()) {
                    throw new IOException("Unexpected problem in startBlock() 
for " + b + ".  File " + f + " should not be present, but is.");
@@ -295,8 +294,10 @@
                    throw new IOException("Unexpected problem in startBlock() 
for " + b + ".  File " + f + " should be creatable, but is already present.");
                }
            } catch (IOException ie) {
+                System.out.println("Exception!  " + ie);
                ongoingCreates.remove(b);               
                reserved -= BLOCK_SIZE;
+                throw ie;
            }
         }
 

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java?rev=226742&r1=226741&r2=226742&view=diff
==============================================================================
--- 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java 
(original)
+++ 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/FSNamesystem.java 
Sun Jul 31 21:54:07 2005
@@ -550,7 +550,6 @@
         TreeSet creates = new TreeSet();
 
         public Lease(UTF8 holder) {
-            LOG.info("New lease, holder " + holder);
             this.holder = holder;
             renew();
         }
@@ -710,7 +709,6 @@
                 sortedLeases.remove(lease);
                 lease.renew();
                 sortedLeases.add(lease);
-                LOG.info("Renewed lease " + lease);
             }
         }
     }

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java?rev=226742&r1=226741&r2=226742&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NDFSClient.java 
Sun Jul 31 21:54:07 2005
@@ -627,6 +627,7 @@
               pos += toWrite;
               off += toWrite;
               len -= toWrite;
+              filePos += toWrite;
 
               if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||
                   (pos == BUFFER_SIZE)) {
@@ -660,7 +661,7 @@
         private synchronized void flushData(int maxPos) throws IOException {
             int workingPos = Math.min(pos, maxPos);
             
-            if (workingPos >= 0) {
+            if (workingPos > 0) {
                 //
                 // To the blockStream, write length, then bytes
                 //

Modified: 
lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java
URL: 
http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java?rev=226742&r1=226741&r2=226742&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java 
(original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/ndfs/NameNode.java 
Sun Jul 31 21:54:07 2005
@@ -71,6 +71,8 @@
     /////////////////////////////////////////////////////
     // ClientProtocol
     /////////////////////////////////////////////////////
+    /**
+     */
     public LocatedBlock[] open(String src) throws IOException {
         Object openResults[] = namesystem.open(new UTF8(src));
         if (openResults == null) {
@@ -86,6 +88,8 @@
         }
     }
 
+    /**
+     */
     public LocatedBlock create(String src, String clientName, boolean 
overwrite) throws IOException {
         Object results[] = namesystem.startFile(new UTF8(src), new 
UTF8(clientName), overwrite);
         if (results == null) {
@@ -97,6 +101,8 @@
         }
     }
 
+    /**
+     */
     public LocatedBlock addBlock(String src) throws IOException {
         Object results[] = namesystem.getAdditionalBlock(new UTF8(src));
         if (results != null && results[0] == null) {
@@ -118,14 +124,20 @@
         }
     }
 
+    /**
+     */
     public void abandonBlock(Block b, String src) throws IOException {
         if (! namesystem.abandonBlock(b, new UTF8(src))) {
             throw new IOException("Cannot abandon block during write to " + 
src);
         }
     }
+    /**
+     */
     public void abandonFileInProgress(String src) throws IOException {
         namesystem.abandonFileInProgress(new UTF8(src));
     }
+    /**
+     */
     public boolean complete(String src, String clientName) throws IOException {
         int returnCode = namesystem.completeFile(new UTF8(src), new 
UTF8(clientName));
         if (returnCode == STILL_WAITING) {
@@ -136,6 +148,8 @@
             throw new IOException("Could not complete write to file " + src + 
" by " + clientName);
         }
     }
+    /**
+     */
     public String[] getHints(String src, long offset) throws IOException {
         UTF8 hosts[] = namesystem.getDatanodeHints(new UTF8(src), offset);
         if (hosts == null) {
@@ -148,27 +162,38 @@
             return results;
         }
     }
-
+    /**
+     */
     public boolean rename(String src, String dst) throws IOException {
         return namesystem.renameTo(new UTF8(src), new UTF8(dst));
     }
 
+    /**
+     */
     public boolean delete(String src) throws IOException {
         return namesystem.delete(new UTF8(src));
     }
 
+    /**
+     */
     public boolean exists(String src) throws IOException {
         return namesystem.exists(new UTF8(src));
     }
 
+    /**
+     */
     public boolean isDir(String src) throws IOException {
         return namesystem.isDir(new UTF8(src));
     }
 
+    /**
+     */
     public boolean mkdirs(String src) throws IOException {
         return namesystem.mkdirs(new UTF8(src));
     }
 
+    /**
+     */
     public boolean obtainLock(String src, String clientName, boolean 
exclusive) throws IOException {
         int returnCode = namesystem.obtainLock(new UTF8(src), new 
UTF8(clientName), exclusive);
         if (returnCode == COMPLETE_SUCCESS) {
@@ -180,6 +205,8 @@
         }
     }
 
+    /**
+     */
     public boolean releaseLock(String src, String clientName) throws 
IOException {
         int returnCode = namesystem.releaseLock(new UTF8(src), new 
UTF8(clientName));
         if (returnCode == COMPLETE_SUCCESS) {
@@ -191,14 +218,26 @@
         }
     }
 
+    /**
+     */
     public void renewLease(String clientName) throws IOException {
         namesystem.renewLease(new UTF8(clientName));        
     }
 
+    /**
+     */
     public NDFSFileInfo[] getListing(String src) throws IOException {
+        /**
+        System.out.println("hbCounts: " + hbCounts + ", avgTime: " + (hbTime / 
(1.0 * hbCounts)));
+        System.out.println("brCounts: " + brCounts + ", avgTime: " + (brTime / 
(1.0 * brCounts)));
+        System.out.println("brvCounts: " + brvCounts + ", avgTime: " + 
(brvTime / (1.0 * brvCounts)));
+        System.out.println("bwCounts: " + bwCounts + ", avgTime: " + (bwTime / 
(1.0 * bwCounts)));
+        **/
         return namesystem.getListing(new UTF8(src));
     }
 
+    /**
+     */
     public long[] getStats() throws IOException {
         long results[] = new long[2];
         results[0] = namesystem.totalCapacity();
@@ -206,6 +245,8 @@
         return results;
     }
 
+    /**
+     */
     public DatanodeInfo[] getDatanodeReport() throws IOException {
         DatanodeInfo results[] = namesystem.datanodeReport();
         if (results == null || results.length == 0) {
@@ -214,20 +255,37 @@
         return results;
     }
 
+    ////////////////////////////////////////////////////////////////
+    // DatanodeProtocol
+    ////////////////////////////////////////////////////////////////
+    long hbTime = 0, brTime = 0, brvTime = 0, bwTime = 0;
+    int hbCounts = 0, brCounts = 0, brvCounts = 0, bwCounts = 0;
     /**
      */
     public void sendHeartbeat(String sender, long capacity, long remaining) {
+        long start = System.currentTimeMillis();
         namesystem.gotHeartbeat(new UTF8(sender), capacity, remaining);
+        long end = System.currentTimeMillis();
+        hbCounts++;
+        hbTime += (end-start);
     }
 
     public void blockReport(String sender, Block blocks[]) {
+        long start = System.currentTimeMillis();
         namesystem.processReport(blocks, new UTF8(sender));
+        long end = System.currentTimeMillis();
+        brCounts++;
+        brTime += (end-start);
     }
 
     public void blockReceived(String sender, Block blocks[]) {
+        long start = System.currentTimeMillis();
         for (int i = 0; i < blocks.length; i++) {
             namesystem.blockReceived(blocks[i], new UTF8(sender));
         }
+        long end = System.currentTimeMillis();
+        brvCounts++;
+        brvTime += (end-start);
     }
 
     /**
@@ -244,23 +302,30 @@
         //
         // Ask to perform pending transfers, if any
         //
-        Object xferResults[] = namesystem.pendingTransfers(new 
DatanodeInfo(new UTF8(sender)), xmitsInProgress);
-        if (xferResults != null) {
-            return new BlockCommand((Block[]) xferResults[0], 
(DatanodeInfo[][]) xferResults[1]);
-        }
+        long start = System.currentTimeMillis();
+        try {
+            Object xferResults[] = namesystem.pendingTransfers(new 
DatanodeInfo(new UTF8(sender)), xmitsInProgress);
+            if (xferResults != null) {
+                return new BlockCommand((Block[]) xferResults[0], 
(DatanodeInfo[][]) xferResults[1]);
+            }
 
-        //
-        // If none, check to see if there are blocks to invalidate
-        //
-        Block blocks[] = namesystem.recentlyInvalidBlocks(new UTF8(sender));
-        if (blocks == null) {
-            blocks = namesystem.checkObsoleteBlocks(new UTF8(sender));
-        }
-        if (blocks != null) {
-            return new BlockCommand(blocks);
-        }
+            //
+            // If none, check to see if there are blocks to invalidate
+            //
+            Block blocks[] = namesystem.recentlyInvalidBlocks(new 
UTF8(sender));
+            if (blocks == null) {
+                blocks = namesystem.checkObsoleteBlocks(new UTF8(sender));
+            }
+            if (blocks != null) {
+                return new BlockCommand(blocks);
+            }
 
-        return new BlockCommand();
+            return new BlockCommand();
+        } finally {
+            long end = System.currentTimeMillis();
+            bwCounts++;
+            bwTime += (end-start);
+        }
     }
 
     /**


Reply via email to