Author: suresh
Date: Tue Jul 30 07:51:38 2013
New Revision: 1508332

URL: http://svn.apache.org/r1508332
Log:
HDFS-5025. Record ClientId and CallId in EditLog to enable rebuilding retry 
cache in case of HA failover. Contributed by Jing Zhao.

Added:
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
Modified:
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java
 Tue Jul 30 07:51:38 2013
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.Client.ConnectionId;
@@ -38,7 +39,12 @@ import org.apache.hadoop.util.ThreadUtil
 
 import com.google.common.annotations.VisibleForTesting;
 
-class RetryInvocationHandler implements RpcInvocationHandler {
+/**
+ * This class implements RpcInvocationHandler and supports retry on the client 
+ * side.
+ */
+@InterfaceAudience.Private
+public class RetryInvocationHandler implements RpcInvocationHandler {
   public static final Log LOG = 
LogFactory.getLog(RetryInvocationHandler.class);
   private final FailoverProxyProvider proxyProvider;
 

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
 Tue Jul 30 07:51:38 2013
@@ -1161,7 +1161,7 @@ public class Client {
         CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
     this.fallbackAllowed = 
conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
         
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
-    this.clientId = StringUtils.getUuidBytes();
+    this.clientId = ClientId.getClientId();
     this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
   }
 

Added: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java?rev=1508332&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
 (added)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientId.java
 Tue Jul 30 07:51:38 2013
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A class defining a set of static helper methods to provide conversion 
between
+ * bytes and string for UUID-based client Id.
+ */
+@InterfaceAudience.Private
+public class ClientId {
+  
+  /** The byte array of a UUID should be 16 */ 
+  public static final int BYTE_LENGTH = 16;
+  
+  /**
+   * Return clientId as byte[]
+   */
+  public static byte[] getClientId() {
+    UUID uuid = UUID.randomUUID();
+    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
+    buf.putLong(uuid.getMostSignificantBits());
+    buf.putLong(uuid.getLeastSignificantBits());
+    return buf.array();
+  }
+  
+  /** Convert a clientId byte[] to string */
+  public static String toString(byte[] clientId) {
+    // clientId can be null or an empty array
+    if (clientId == null || clientId.length == 0) {
+      return "";
+    }
+    // otherwise should be 16 bytes
+    Preconditions.checkArgument(clientId.length == BYTE_LENGTH);
+    long msb = 0;
+    long lsb = 0;
+    for (int i = 0; i < 8; i++) {
+      msb = (msb << 8) | (clientId[i] & 0xff);
+    }
+    for (int i = 8; i < 16; i++) {
+      lsb = (lsb << 8) | (clientId[i] & 0xff);
+    }
+    return (new UUID(msb, lsb)).toString();
+  }
+  
+  /** Convert from clientId string byte[] representation of clientId */
+  public static byte[] toBytes(String id) {
+    if (id == null || "".equals(id)) {
+      return new byte[0];
+    }
+    UUID uuid = UUID.fromString(id);
+    ByteBuffer buf = ByteBuffer.wrap(new byte[BYTE_LENGTH]);
+    buf.putLong(uuid.getMostSignificantBits());
+    buf.putLong(uuid.getLeastSignificantBits());
+    return buf.array();
+  }
+
+}

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RetryCache.java
 Tue Jul 30 07:51:38 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.ipc;
 
 
 import java.util.Arrays;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -27,6 +28,7 @@ import org.apache.hadoop.util.LightWeigh
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -64,8 +66,9 @@ public class RetryCache {
 
     CacheEntry(byte[] clientId, int callId, long expirationTime) {
       // ClientId must be a UUID - that is 16 octets.
-      Preconditions.checkArgument(clientId.length == 16,
-          "Invalid clientId - must be UUID of size 16 octets");
+      Preconditions.checkArgument(clientId.length == ClientId.BYTE_LENGTH,
+          "Invalid clientId - length is " + clientId.length
+              + " expected length " + ClientId.BYTE_LENGTH);
       // Convert UUID bytes to two longs
       long tmp = 0;
       for (int i=0; i<8; i++) {
@@ -131,6 +134,12 @@ public class RetryCache {
     public long getExpirationTime() {
       return expirationTime;
     }
+    
+    @Override
+    public String toString() {
+      return (new UUID(this.clientIdMsb, this.clientIdLsb)).toString() + ":"
+          + this.callId + ":" + this.state;
+    }
   }
 
   /**
@@ -186,6 +195,11 @@ public class RetryCache {
     return !Server.isRpcInvocation() || Server.getCallId() < 0
         || Arrays.equals(Server.getClientId(), RpcConstants.DUMMY_CLIENT_ID);
   }
+  
+  @VisibleForTesting
+  public LightWeightGSet<CacheEntry, CacheEntry> getCacheSet() {
+    return set;
+  }
 
   /**
    * This method handles the following conditions:
@@ -240,6 +254,26 @@ public class RetryCache {
     }
     return mapEntry;
   }
+  
+  /** 
+   * Add a new cache entry into the retry cache. The cache entry consists of 
+   * clientId and callId extracted from editlog.
+   */
+  public void addCacheEntry(byte[] clientId, int callId) {
+    CacheEntry newEntry = new CacheEntry(clientId, callId, System.nanoTime()
+        + expirationTime);
+    newEntry.completed(true);
+    set.put(newEntry);
+  }
+  
+  public void addCacheEntryWithPayload(byte[] clientId, int callId,
+      Object payload) {
+    CacheEntry newEntry = new CacheEntryWithPayload(clientId, callId, payload,
+        System.nanoTime() + expirationTime);
+    // since the entry is loaded from editlog, we can assume it succeeded.    
+    newEntry.completed(true);
+    set.put(newEntry);
+  }
 
   private static CacheEntry newEntry(long expirationTime) {
     return new CacheEntry(Server.getClientId(), Server.getCallId(),

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
 Tue Jul 30 07:51:38 2013
@@ -22,7 +22,6 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -33,7 +32,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.StringTokenizer;
-import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -42,7 +40,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Shell;
 
 import com.google.common.net.InetAddresses;
 
@@ -898,17 +895,6 @@ public class StringUtils {
   }
   
   /**
-   * Return a new UUID as byte[]
-   */
-  public static byte[] getUuidBytes() {
-    UUID uuid = UUID.randomUUID();
-    ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
-    buf.putLong(uuid.getMostSignificantBits());
-    buf.putLong(uuid.getLeastSignificantBits());
-    return buf.array();
-  }
-  
-  /**
    * Get stack trace for a given thread.
    */
   public static String getStackTrace(Thread t) {

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRetryCache.java
 Tue Jul 30 07:51:38 2013
@@ -29,8 +29,6 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,7 +37,7 @@ import org.junit.Test;
  * Tests for {@link RetryCache}
  */
 public class TestRetryCache {
-  private static final byte[] CLIENT_ID = StringUtils.getUuidBytes();
+  private static final byte[] CLIENT_ID = ClientId.getClientId();
   private static int callId = 100;
   private static final Random r = new Random();
   private static final TestServer testServer = new TestServer();

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java?rev=1508332&r1=1508331&r2=1508332&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java
 Tue Jul 30 07:51:38 2013
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.util;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -26,6 +26,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.RpcConstants;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -78,7 +79,7 @@ public class TestProtoUtil {
   
   @Test
   public void testRpcClientId() {
-    byte[] uuid = StringUtils.getUuidBytes();
+    byte[] uuid = ClientId.getClientId();
     RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
         RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
         RpcConstants.INVALID_RETRY_COUNT, uuid);


Reply via email to