HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to bypass 
Azure Storage Throttling after retries. Contributed by Gaurav Kanade.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47641fcb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47641fcb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47641fcb

Branch: refs/heads/HADOOP-11890
Commit: 47641fcbc9c41f4a338d8899501e4a310d2e81ad
Parents: aea26bf
Author: cnauroth <cnaur...@apache.org>
Authored: Thu Oct 22 12:21:32 2015 -0700
Committer: cnauroth <cnaur...@apache.org>
Committed: Thu Oct 22 12:21:32 2015 -0700

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 ++
 .../fs/azure/AzureNativeFileSystemStore.java    | 47 +++++++++++++++++---
 2 files changed, 43 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/47641fcb/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index ce2a2a7..74c62cb 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -1335,6 +1335,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12418. TestRPC.testRPCInterruptedSimple fails intermittently.
     (kihwal)
 
+    HADOOP-12334. Change Mode Of Copy Operation of HBase WAL Archiving to 
bypass
+    Azure Storage Throttling after retries. (Gaurav Kanade via cnauroth)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47641fcb/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 679413a..8a33742 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
 import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.io.IOUtils;
 import org.mortbay.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +77,7 @@ import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.BlobListingDetails;
 import com.microsoft.azure.storage.blob.BlobProperties;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.BlobType;
 import com.microsoft.azure.storage.blob.CloudBlob;
 import com.microsoft.azure.storage.blob.CopyStatus;
 import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
@@ -2373,6 +2375,9 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       throw new IOException("Cannot acquire new lease if one already exists.");
     }
 
+    CloudBlobWrapper srcBlob = null;
+    CloudBlobWrapper dstBlob = null;
+    SelfRenewingLease lease = null;
     try {
       // Attempts rename may occur before opening any streams so first,
       // check if a session exists, if not create a session with the Azure
@@ -2388,8 +2393,8 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       // Get the source blob and assert its existence. If the source key
       // needs to be normalized then normalize it.
       //
-      CloudBlobWrapper srcBlob = getBlobReference(srcKey);
 
+      srcBlob = getBlobReference(srcKey);
       if (!srcBlob.exists(getInstrumentedContext())) {
         throw new AzureException ("Source blob " + srcKey +
             " does not exist.");
@@ -2406,7 +2411,6 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
        * when HBase runs on HDFS, where the region server recovers the lease
        * on a log file, to gain exclusive access to it, before it splits it.
        */
-      SelfRenewingLease lease = null;
       if (acquireLease) {
         lease = srcBlob.acquireLease();
       } else if (existingLease != null) {
@@ -2416,7 +2420,7 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       // Get the destination blob. The destination key always needs to be
       // normalized.
       //
-      CloudBlobWrapper dstBlob = getBlobReference(dstKey);
+      dstBlob = getBlobReference(dstKey);
 
       // Rename the source blob to the destination blob by copying it to
       // the destination blob then deleting it.
@@ -2429,7 +2433,7 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
         dstBlob.startCopyFromBlob(srcBlob, null, getInstrumentedContext());
       } catch (StorageException se) {
         if (se.getErrorCode().equals(
-                 StorageErrorCode.SERVER_BUSY.toString())) {
+          StorageErrorCode.SERVER_BUSY.toString())) {
           int copyBlobMinBackoff = sessionConfiguration.getInt(
             KEY_COPYBLOB_MIN_BACKOFF_INTERVAL,
                        DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
@@ -2449,7 +2453,7 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
           BlobRequestOptions options = new BlobRequestOptions();
           options.setRetryPolicyFactory(new RetryExponentialRetry(
             copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff, 
-                       copyBlobMaxRetries));
+            copyBlobMaxRetries));
           dstBlob.startCopyFromBlob(srcBlob, options, 
getInstrumentedContext());
         } else {
           throw se;
@@ -2458,8 +2462,37 @@ public class AzureNativeFileSystemStore implements 
NativeFileSystemStore {
       waitForCopyToComplete(dstBlob, getInstrumentedContext());
       safeDelete(srcBlob, lease);
     } catch (StorageException e) {
-      // Re-throw exception as an Azure storage exception.
-      throw new AzureException(e);
+      if (e.getErrorCode().equals(
+        StorageErrorCode.SERVER_BUSY.toString())) {
+        LOG.warn("Rename: CopyBlob: StorageException: ServerBusy: Retry 
complete, will attempt client side copy for page blob");
+        InputStream ipStream = null;
+        OutputStream opStream = null;
+        try {
+          if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){
+            ipStream = openInputStream(srcBlob);
+            opStream = openOutputStream(dstBlob);
+            byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE];
+            int len;
+            while ((len = ipStream.read(buffer)) != -1) {
+              opStream.write(buffer, 0, len);
+            }
+            opStream.flush();
+            opStream.close();
+            ipStream.close();
+          } else {
+            throw new AzureException(e);
+          }
+          safeDelete(srcBlob, lease);
+        } catch(StorageException se) {
+          LOG.warn("Rename: CopyBlob: StorageException: Failed");
+          throw new AzureException(se);
+        } finally {
+          IOUtils.closeStream(ipStream);
+          IOUtils.closeStream(opStream);
+        }
+      } else {
+        throw new AzureException(e);
+      }
     } catch (URISyntaxException e) {
       // Re-throw exception as an Azure storage exception.
       throw new AzureException(e);

Reply via email to