Author: kihwal
Date: Tue Feb 25 19:24:15 2014
New Revision: 1571792

URL: http://svn.apache.org/r1571792
Log:
HDFS-5924. Utilize OOB upgrade message processing for writes. Contributed by 
Kihwal Lee.

Modified:
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
 Tue Feb 25 19:24:15 2014
@@ -93,3 +93,5 @@ HDFS-5535 subtasks:
 
     HDFS-6015. Fix TestBlockRecovery
     #testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal)
+
+    HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal)

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 Tue Feb 25 19:24:15 2014
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@@ -268,6 +270,7 @@ public class DFSClient implements java.i
     final int getFileBlockStorageLocationsTimeout;
     final int retryTimesForGetLastBlockLength;
     final int retryIntervalForGetLastBlockLength;
+    final long datanodeRestartTimeout;
 
     final boolean useLegacyBlockReader;
     final boolean useLegacyBlockReaderLocal;
@@ -411,6 +414,10 @@ public class DFSClient implements java.i
       shortCircuitCacheStaleThresholdMs = conf.getLong(
           DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
           
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
+
+      datanodeRestartTimeout = conf.getLong(
+          DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
+          DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
     }
 
     private DataChecksum.Type getChecksumType(Configuration conf) {

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 Tue Feb 25 19:24:15 2014
@@ -94,6 +94,10 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 
3000;
   public static final String  
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = 
"dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
   public static final long    
DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; 
// 10 minutes, in ms
+  public static final String  DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = 
"dfs.client.datanode-restart.timeout";
+  public static final long    DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
+  public static final String  DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = 
"dfs.datanode.restart.replica.expiration";
+  public static final long    DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50;
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = 
"dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = 
"localhost:50100";
   public static final String  DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = 
"dfs.namenode.backup.http-address";

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 Tue Feb 25 19:24:15 2014
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
@@ -335,6 +336,8 @@ public class DFSOutputStream extends FSO
     private String[] favoredNodes;
     volatile boolean hasError = false;
     volatile int errorIndex = -1;
+    volatile int restartingNodeIndex = -1; // Restarting node index
+    private long restartDeadline = 0; // Deadline of DN restart
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
 
@@ -471,7 +474,7 @@ public class DFSOutputStream extends FSO
         try {
           // process datanode IO errors if any
           boolean doSleep = false;
-          if (hasError && errorIndex>=0) {
+          if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
             doSleep = processDatanodeError();
           }
 
@@ -571,8 +574,12 @@ public class DFSOutputStream extends FSO
             blockStream.flush();   
           } catch (IOException e) {
             // HDFS-3398 treat primary DN is down since client is unable to 
-            // write to primary DN
-            errorIndex = 0;
+            // write to primary DN. If a failed or restarting node has already
+            // been recorded by the responder, the following call will have no 
+            // effect. Pipeline recovery can handle only one node error at a
+            // time. If the primary node fails again during the recovery, it
+            // will be taken out then.
+            tryMarkPrimaryDatanodeFailed();
             throw e;
           }
           lastPacket = Time.now();
@@ -609,12 +616,16 @@ public class DFSOutputStream extends FSO
             Thread.sleep(artificialSlowdown); 
           }
         } catch (Throwable e) {
-          DFSClient.LOG.warn("DataStreamer Exception", e);
+          // Log warning if there was a real error.
+          if (restartingNodeIndex == -1) {
+            DFSClient.LOG.warn("DataStreamer Exception", e);
+          }
           if (e instanceof IOException) {
             setLastException((IOException)e);
           }
           hasError = true;
-          if (errorIndex == -1) { // not a datanode error
+          if (errorIndex == -1 && restartingNodeIndex == -1) {
+            // Not a datanode issue
             streamerClosed = true;
           }
         }
@@ -694,6 +705,65 @@ public class DFSOutputStream extends FSO
       }
     }
 
+    // The following synchronized methods are used whenever 
+    // errorIndex or restartingNodeIndex is set. This is because
+    // check & set needs to be atomic. Simply reading variables
+    // does not require a synchronization. When responder is
+    // not running (e.g. during pipeline recovery), there is no
+    // need to use these methods.
+
+    /** Set the error node index. Called by responder */
+    synchronized void setErrorIndex(int idx) {
+      errorIndex = idx;
+    }
+
+    /** Set the restarting node index. Called by responder */
+    synchronized void setRestartingNodeIndex(int idx) {
+      restartingNodeIndex = idx;
+      // If the data streamer has already set the primary node
+      // bad, clear it. It is likely that the write failed due to
+      // the DN shutdown. Even if it was a real failure, the pipeline
+      // recovery will take care of it.
+      errorIndex = -1;      
+    }
+
+    /**
+     * This method is used when no explicit error report was received,
+     * but something failed. When the primary node is a suspect or
+     * unsure about the cause, the primary node is marked as failed.
+     */
+    synchronized void tryMarkPrimaryDatanodeFailed() {
+      // There should be no existing error and no ongoing restart.
+      if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
+        errorIndex = 0;
+      }
+    }
+
+    /**
+     * Examine whether it is worth waiting for a node to restart.
+     * @param index the node index
+     */
+    boolean shouldWaitForRestart(int index) {
+      // Only one node in the pipeline.
+      if (nodes.length == 1) {
+        return true;
+      }
+
+      // Is it a local node?
+      InetAddress addr = null;
+      try {
+        addr = InetAddress.getByName(nodes[index].getIpAddr());
+      } catch (java.net.UnknownHostException e) {
+        // we are passing an ip address. this should not happen.
+        assert false;
+      }
+
+      if (addr != null && NetUtils.isLocalAddress(addr)) {
+        return true;
+      }
+      return false;
+    }
+
     //
     // Processes responses from the datanodes.  A packet is removed
     // from the ackQueue when its response arrives.
@@ -727,8 +797,20 @@ public class DFSOutputStream extends FSO
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && 
dfsClient.clientRunning; i--) {
               final Status reply = ack.getReply(i);
+              // Restart will not be treated differently unless it is
+              // the local node or the only one in the pipeline.
+              if (PipelineAck.isRestartOOBStatus(reply) &&
+                  shouldWaitForRestart(i)) {
+                restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
+                    Time.now();
+                setRestartingNodeIndex(i);
+                String message = "A datanode is restarting: " + targets[i];
+                DFSClient.LOG.info(message);
+               throw new IOException(message);
+              }
+              // node error
               if (reply != SUCCESS) {
-                errorIndex = i; // first bad datanode
+                setErrorIndex(i); // first bad datanode
                 throw new IOException("Bad response " + reply +
                     " for block " + block +
                     " from datanode " + 
@@ -777,12 +859,16 @@ public class DFSOutputStream extends FSO
                 setLastException((IOException)e);
               }
               hasError = true;
-              errorIndex = errorIndex==-1 ? 0 : errorIndex;
+              // If no explicit error report was received, mark the primary
+              // node as failed.
+              tryMarkPrimaryDatanodeFailed();
               synchronized (dataQueue) {
                 dataQueue.notifyAll();
               }
-              DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
-                  + " for block " + block, e);
+              if (restartingNodeIndex == -1) {
+                DFSClient.LOG.warn("DFSOutputStream ResponseProcessor 
exception "
+                     + " for block " + block, e);
+              }
               responderClosed = true;
             }
           }
@@ -1001,6 +1087,24 @@ public class DFSOutputStream extends FSO
       boolean success = false;
       long newGS = 0L;
       while (!success && !streamerClosed && dfsClient.clientRunning) {
+        // Sleep before reconnect if a dn is restarting.
+        // This process will be repeated until the deadline or the datanode
+        // starts back up.
+        if (restartingNodeIndex >= 0) {
+          // 4 seconds or the configured deadline period, whichever is shorter.
+          // This is the retry interval and recovery will be retried in this
+          // interval until timeout or success.
+          long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+              4000L);
+          try {
+            Thread.sleep(delay);
+          } catch (InterruptedException ie) {
+            lastException.set(new IOException("Interrupted while waiting for " 
+
+                "datanode to restart. " + nodes[restartingNodeIndex]));
+            streamerClosed = true;
+            return false;
+          }
+        }
         boolean isRecovery = hasError;
         // remove bad datanode from list of datanodes.
         // If errorIndex was not set (i.e. appends), then do not remove 
@@ -1037,7 +1141,24 @@ public class DFSOutputStream extends FSO
           
           setPipeline(newnodes, newStorageIDs);
 
-          hasError = false;
+          // Just took care of a node error while waiting for a node restart
+          if (restartingNodeIndex >= 0) {
+            // If the error came from a node further away than the restarting
+            // node, the restart must have been complete.
+            if (errorIndex > restartingNodeIndex) {
+              restartingNodeIndex = -1;
+            } else if (errorIndex < restartingNodeIndex) {
+              // the node index has shifted.
+              restartingNodeIndex--;
+            } else {
+              // this shouldn't happen...
+              assert false;
+            }
+          }
+
+          if (restartingNodeIndex == -1) {
+            hasError = false;
+          }
           lastException.set(null);
           errorIndex = -1;
         }
@@ -1066,7 +1187,34 @@ public class DFSOutputStream extends FSO
         } else {
           success = createBlockOutputStream(nodes, newGS, isRecovery);
         }
-      }
+
+        if (restartingNodeIndex >= 0) {
+          assert hasError == true;
+          // check errorIndex set above
+          if (errorIndex == restartingNodeIndex) {
+            // ignore, if came from the restarting node
+            errorIndex = -1;
+          }
+          // still within the deadline
+          if (Time.now() < restartDeadline) {
+            continue; // with in the deadline
+          }
+          // expired. declare the restarting node dead
+          restartDeadline = 0;
+          int expiredNodeIndex = restartingNodeIndex;
+          restartingNodeIndex = -1;
+          DFSClient.LOG.warn("Datanode did not restart in time: " +
+              nodes[expiredNodeIndex]);
+          // Mark the restarting node as failed. If there is any other failed
+          // node during the last pipeline construction attempt, it will not be
+          // overwritten/dropped. In this case, the restarting node will get
+          // excluded in the following attempt, if it still does not come up.
+          if (errorIndex == -1) {
+            errorIndex = expiredNodeIndex;
+          }
+          // From this point on, normal pipeline recovery applies.
+        }
+      } // while
 
       if (success) {
         // update pipeline at the namenode
@@ -1144,6 +1292,7 @@ public class DFSOutputStream extends FSO
       }
       Status pipelineStatus = SUCCESS;
       String firstBadLink = "";
+      boolean checkRestart = false;
       if (DFSClient.LOG.isDebugEnabled()) {
         for (int i = 0; i < nodes.length; i++) {
           DFSClient.LOG.debug("pipeline = " + nodes[i]);
@@ -1192,6 +1341,16 @@ public class DFSOutputStream extends FSO
           pipelineStatus = resp.getStatus();
           firstBadLink = resp.getFirstBadLink();
           
+          // Got an restart OOB ack.
+          // If a node is already restarting, this status is not likely from
+          // the same node. If it is from a different node, it is not
+          // from the local datanode. Thus it is safe to treat this as a
+          // regular node error.
+          if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
+            restartingNodeIndex == -1) {
+            checkRestart = true;
+            throw new IOException("A datanode is restarting.");
+          }
           if (pipelineStatus != SUCCESS) {
             if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
@@ -1205,9 +1364,12 @@ public class DFSOutputStream extends FSO
           assert null == blockStream : "Previous blockStream unclosed";
           blockStream = out;
           result =  true; // success
-  
+          restartingNodeIndex = -1;
+          hasError = false;
         } catch (IOException ie) {
-          DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          if (restartingNodeIndex == -1) {
+            DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+          }
           if (ie instanceof InvalidEncryptionKeyException && 
refetchEncryptionKey > 0) {
             DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                 + "encryption key was invalid when connecting to "
@@ -1230,8 +1392,18 @@ public class DFSOutputStream extends FSO
               }
             }
           } else {
+            assert checkRestart == false;
             errorIndex = 0;
           }
+          // Check whether there is a restart worth waiting for.
+          if (checkRestart && shouldWaitForRestart(errorIndex)) {
+            restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
+                Time.now();
+            restartingNodeIndex = errorIndex;
+            errorIndex = -1;
+            DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
+                nodes[restartingNodeIndex]);
+          }
           hasError = true;
           setLastException(ie);
           result =  false;  // error

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 Tue Feb 25 19:24:15 2014
@@ -23,8 +23,10 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -52,6 +55,7 @@ import org.apache.hadoop.io.nativeio.Nat
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -116,6 +120,7 @@ class BlockReceiver implements Closeable
   private final boolean isTransfer;
 
   private boolean syncOnClose;
+  private long restartBudget;
 
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -135,6 +140,7 @@ class BlockReceiver implements Closeable
       this.clientname = clientname;
       this.isDatanode = clientname.length() == 0;
       this.isClient = !this.isDatanode;
+      this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
 
       //for datanode, we have
       //1: clientName.length() == 0, and
@@ -742,16 +748,35 @@ class BlockReceiver implements Closeable
         if (responder != null) {
           // In case this datanode is shutting down for quick restart,
           // send a special ack upstream.
-          if (datanode.isRestarting()) {
+          if (datanode.isRestarting() && isClient && !isTransfer) {
+            File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile();
+            File restartMeta = new File(blockFile.getParent()  + 
+                File.pathSeparator + "." + blockFile.getName() + ".restart");
+            if (restartMeta.exists()) {
+              restartMeta.delete();
+            }
+            try {
+              FileWriter out = new FileWriter(restartMeta);
+              // write out the current time.
+              out.write(Long.toString(Time.now() + restartBudget));
+              out.flush();
+              out.close();
+            } catch (IOException ioe) {
+              // The worst case is not recovering this RBW replica. 
+              // Client will fall back to regular pipeline recovery.
+            }
             try {
               ((PacketResponder) responder.getRunnable()).
                   sendOOBResponse(PipelineAck.getRestartOOBStatus());
+              // Even if the connection is closed after the ack packet is
+              // flushed, the client can react to the connection closure 
+              // first. Insert a delay to lower the chance of client 
+              // missing the OOB ack.
+              Thread.sleep(1000);
             } catch (InterruptedException ie) {
               // It is already going down. Ignore this.
             } catch (IOException ioe) {
               LOG.info("Error sending OOB Ack.", ioe);
-              // The OOB ack could not be sent. Since the datanode is going
-              // down, this is ignored.
             }
           }
           responder.interrupt();
@@ -1279,7 +1304,6 @@ class BlockReceiver implements Closeable
           && offsetInBlock > replicaInfo.getBytesAcked()) {
         replicaInfo.setBytesAcked(offsetInBlock);
       }
-
       // send my ack back to upstream datanode
       replyAck.write(upstreamOut);
       upstreamOut.flush();

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 Tue Feb 25 19:24:15 2014
@@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -82,6 +84,7 @@ public class DNConf {
   final String encryptionAlgorithm;
   
   final long xceiverStopTimeout;
+  final long restartReplicaExpiry;
 
   final long maxLockedMemory;
 
@@ -157,6 +160,10 @@ public class DNConf {
     this.maxLockedMemory = conf.getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+
+    this.restartReplicaExpiry = conf.getLong(
+        DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
+        DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in 
tests.

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
 Tue Feb 25 19:24:15 2014
@@ -21,9 +21,11 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
@@ -36,11 +38,13 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
 
 /**
  * A block pool slice represents a portion of a block pool stored on a volume. 
 
@@ -191,9 +195,35 @@ class BlockPoolSlice {
         newReplica = new FinalizedReplica(blockId, 
             blockFile.length(), genStamp, volume, blockFile.getParentFile());
       } else {
-        newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrityAndSetLength(blockFile, genStamp), 
-            genStamp, volume, blockFile.getParentFile());
+
+        boolean loadRwr = true;
+        File restartMeta = new File(blockFile.getParent()  +
+            File.pathSeparator + "." + blockFile.getName() + ".restart");
+        Scanner sc = null;
+        try {
+          sc = new Scanner(restartMeta);
+          // The restart meta file exists
+          if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+            // It didn't expire. Load the replica as a RBW.
+            newReplica = new ReplicaBeingWritten(blockId,
+                validateIntegrityAndSetLength(blockFile, genStamp), 
+                genStamp, volume, blockFile.getParentFile(), null);
+            loadRwr = false;
+          }
+          restartMeta.delete();
+        } catch (FileNotFoundException fnfe) {
+          // nothing to do here
+        } finally {
+          if (sc != null) {
+            sc.close();
+          }
+        }
+        // Restart meta doesn't exist or expired.
+        if (loadRwr) {
+          newReplica = new ReplicaWaitingToBeRecovered(blockId,
+              validateIntegrityAndSetLength(blockFile, genStamp), 
+              genStamp, volume, blockFile.getParentFile());
+        }
       }
 
       ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
@@ -298,4 +328,4 @@ class BlockPoolSlice {
   void shutdown() {
     dfsUsage.shutdown();
   }
-}
\ No newline at end of file
+}

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
 Tue Feb 25 19:24:15 2014
@@ -1031,6 +1031,18 @@
 </property>
 
 <property>
+  <name>dfs.client.datanode-restart.timeout</name>
+  <value>30</value>
+  <description>
+    Expert only. The time to wait, in seconds, from reception of an
+    datanode shutdown notification for quick restart, until declaring
+    the datanode dead and invoking the normal recovery mechanisms.
+    The notification is sent by a datanode when it is being shutdown
+    using the shutdownDatanode admin command with the upgrade option.
+  </description>
+</property>
+
+<property>
   <name>dfs.nameservices</name>
   <value></value>
   <description>

Modified: 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=1571792&r1=1571791&r2=1571792&view=diff
==============================================================================
--- 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 (original)
+++ 
hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
 Tue Feb 25 19:24:15 2014
@@ -162,19 +162,25 @@ public class TestClientProtocolForPipeli
     }
   }
 
-  /** Test recovery on restart OOB message */
+  /**
+   * Test recovery on restart OOB message. It also tests the delivery of 
+   * OOB ack originating from the primary datanode. Since there is only
+   * one node in the cluster, failure of restart-recovery will fail the
+   * test.
+   */
   @Test
   public void testPipelineRecoveryOnOOB() throws Exception {
     Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
     MiniDFSCluster cluster = null;
     try {
-      int numDataNodes = 3;
+      int numDataNodes = 1;
       cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
       cluster.waitActive();
       FileSystem fileSys = cluster.getFileSystem();
 
       Path file = new Path("dataprotocol2.dat");
-      DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
       DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
           getWrappedStream());
       out.write(1);
@@ -186,10 +192,66 @@ public class TestClientProtocolForPipeli
       // issue shutdown to the datanode.
       final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
       Assert.assertEquals(0, dfsadmin.run(args1));
+      // Wait long enough to receive an OOB ack before closing the file.
+      Thread.sleep(4000);
+      // Retart the datanode 
+      cluster.restartDataNode(0, true);
+      // The following forces a data packet and end of block packets to be 
sent. 
       out.close();
-      Thread.sleep(3000);
-      final String[] args2 = {"-getDatanodeInfo", dnAddr };
-      Assert.assertEquals(-1, dfsadmin.run(args2));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /** Test restart timeout */
+  @Test
+  public void testPipelineRecoveryOnRestartFailure() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5");
+    MiniDFSCluster cluster = null;
+    try {
+      int numDataNodes = 2;
+      cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+      cluster.waitActive();
+      FileSystem fileSys = cluster.getFileSystem();
+
+      Path file = new Path("dataprotocol3.dat");
+      DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
+      DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+          getWrappedStream());
+      out.write(1);
+      out.hflush();
+
+      DFSAdmin dfsadmin = new DFSAdmin(conf);
+      DataNode dn = cluster.getDataNodes().get(0);
+      final String dnAddr1 = dn.getDatanodeId().getIpcAddr(false);
+      // issue shutdown to the datanode.
+      final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
+      Assert.assertEquals(0, dfsadmin.run(args1));
+      Thread.sleep(4000);
+      // This should succeed without restarting the node. The restart will
+      // expire and regular pipeline recovery will kick in. 
+      out.close();
+
+      // At this point there is only one node in the cluster. 
+      out = (DFSOutputStream)(fileSys.append(file).
+          getWrappedStream());
+      out.write(1);
+      out.hflush();
+
+      dn = cluster.getDataNodes().get(1);
+      final String dnAddr2 = dn.getDatanodeId().getIpcAddr(false);
+      // issue shutdown to the datanode.
+      final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
+      Assert.assertEquals(0, dfsadmin.run(args2));
+      Thread.sleep(4000);
+      try {
+        // close should fail
+        out.close();
+        assert false;
+      } catch (IOException ioe) { }
     } finally {
       if (cluster != null) {
         cluster.shutdown();


Reply via email to