HADOOP-10809. hadoop-azure: page blob support. Contributed by Dexter Bradshaw, Mostafa Elhemali, Eric Hanson, and Mike Liddell.
(cherry picked from commit 2217e2f8ff418b88eac6ad36cafe3a9795a11f40) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a737026 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a737026 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a737026 Branch: refs/heads/branch-2 Commit: 5a737026cc365eb86c1d72c660fb13d8540a03b4 Parents: 9a2e4f4 Author: cnauroth <[email protected]> Authored: Wed Oct 8 14:20:23 2014 -0700 Committer: cnauroth <[email protected]> Committed: Wed Dec 17 14:57:13 2014 -0800 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + hadoop-tools/hadoop-azure/README.txt | 48 + .../dev-support/findbugs-exclude.xml | 31 + .../hadoop-azure/src/config/checkstyle.xml | 7 +- .../hadoop/fs/azure/AzureLinkedStack.java | 217 +++ .../fs/azure/AzureNativeFileSystemStore.java | 740 +++++++--- .../hadoop/fs/azure/NativeAzureFileSystem.java | 1337 ++++++++++++++---- .../hadoop/fs/azure/NativeFileSystemStore.java | 22 +- .../hadoop/fs/azure/PageBlobFormatHelpers.java | 58 + .../hadoop/fs/azure/PageBlobInputStream.java | 455 ++++++ .../hadoop/fs/azure/PageBlobOutputStream.java | 497 +++++++ .../apache/hadoop/fs/azure/PartialListing.java | 2 +- .../hadoop/fs/azure/SelfRenewingLease.java | 202 +++ .../fs/azure/SelfThrottlingIntercept.java | 4 +- .../fs/azure/ShellDecryptionKeyProvider.java | 3 +- .../hadoop/fs/azure/SimpleKeyProvider.java | 3 +- .../hadoop/fs/azure/StorageInterface.java | 280 +++- .../hadoop/fs/azure/StorageInterfaceImpl.java | 184 ++- .../fs/azure/SyncableDataOutputStream.java | 56 + .../java/org/apache/hadoop/fs/azure/Wasb.java | 1 - .../metrics/AzureFileSystemInstrumentation.java | 5 +- .../metrics/ResponseReceivedMetricUpdater.java | 7 +- .../services/org.apache.hadoop.fs.FileSystem | 17 + .../fs/azure/AzureBlobStorageTestAccount.java | 155 +- .../hadoop/fs/azure/InMemoryBlockBlobStore.java | 70 +- .../hadoop/fs/azure/MockStorageInterface.java | 233 ++- .../fs/azure/NativeAzureFileSystemBaseTest.java | 1016 ++++++++++++- .../hadoop/fs/azure/RunningLiveWasbTests.txt | 22 + .../azure/TestAzureConcurrentOutOfBandIo.java | 75 +- .../TestAzureFileSystemErrorConditions.java | 57 +- .../hadoop/fs/azure/TestBlobDataValidation.java | 12 +- .../hadoop/fs/azure/TestBlobMetadata.java | 35 +- .../fs/azure/TestBlobTypeSpeedDifference.java | 160 +++ .../fs/azure/TestNativeAzureFSPageBlobLive.java | 43 + .../TestNativeAzureFileSystemConcurrency.java | 5 +- .../TestNativeAzureFileSystemContractLive.java | 26 + ...TestNativeAzureFileSystemContractMocked.java | 25 + .../TestNativeAzureFileSystemFileNameCheck.java | 11 +- .../fs/azure/TestNativeAzureFileSystemLive.java | 75 + .../azure/TestNativeAzureFileSystemMocked.java | 35 + ...stNativeAzureFileSystemOperationsMocked.java | 37 +- .../TestNativeAzureFileSystemUploadLogic.java | 186 +++ .../azure/TestOutOfBandAzureBlobOperations.java | 17 +- .../TestOutOfBandAzureBlobOperationsLive.java | 21 + .../TestReadAndSeekPageBlobAfterWrite.java | 333 +++++ .../apache/hadoop/fs/azure/TestWasbFsck.java | 33 + .../fs/azure/TestWasbUriAndConfiguration.java | 9 +- .../fs/azure/metrics/AzureMetricsTestUtil.java | 1 + .../TestAzureFileSystemInstrumentation.java | 53 +- .../metrics/TestBandwidthGaugeUpdater.java | 47 - .../src/test/resources/azure-test.xml | 12 - 51 files changed, 6046 insertions(+), 937 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 7450513..6509bcb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -79,6 +79,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11238. Update the NameNode's Group Cache in the background when possible (Chris Li via Colin P. McCabe) + HADOOP-10809. hadoop-azure: page blob support. (Dexter Bradshaw, + Mostafa Elhemali, Eric Hanson, and Mike Liddell via cnauroth) + BUG FIXES HADOOP-11236. NFS: Fix javadoc warning in RpcProgram.java (Abhiraj Butala via harsh) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/README.txt ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/README.txt b/hadoop-tools/hadoop-azure/README.txt index 73306d3..a1d1a65 100644 --- a/hadoop-tools/hadoop-azure/README.txt +++ b/hadoop-tools/hadoop-azure/README.txt @@ -77,6 +77,54 @@ src\test\resources\azure-test.xml. These settings augment the hadoop configurati For live tests, set the following in azure-test.xml: 1. "fs.azure.test.account.name -> {azureStorageAccountName} 2. "fs.azure.account.key.{AccountName} -> {fullStorageKey}" + +=================================== +Page Blob Support and Configuration +=================================== + +The Azure Blob Storage interface for Hadoop supports two kinds of blobs, block blobs +and page blobs. Block blobs are the default kind of blob and are good for most +big-data use cases, like input data for Hive, Pig, analytical map-reduce jobs etc. +Page blob handling in hadoop-azure was introduced to support HBase log files. +Page blobs can be written any number of times, whereas block blobs can only be +appended to 50,000 times before you run out of blocks and your writes will fail. +That won't work for HBase logs, so page blob support was introduced to overcome +this limitation. + +Page blobs can be used for other purposes beyond just HBase log files though. +They support the Hadoop FileSystem interface. Page blobs can be up to 1TB in +size, larger than the maximum 200GB size for block blobs. + +In order to have the files you create be page blobs, you must set the configuration +variable fs.azure.page.blob.dir to a comma-separated list of folder names. +E.g. + + /hbase/WALs,/hbase/oldWALs,/data/mypageblobfiles + +You can set this to simply / to make all files page blobs. + +The configuration option fs.azure.page.blob.size is the default initial +size for a page blob. It must be 128MB or greater, and no more than 1TB, +specified as an integer number of bytes. + +==================== +Atomic Folder Rename +==================== + +Azure storage stores files as a flat key/value store without formal support +for folders. The hadoop-azure file system layer simulates folders on top +of Azure storage. By default, folder rename in the hadoop-azure file system +layer is not atomic. That means that a failure during a folder rename +could, for example, leave some folders in the original directory and +some in the new one. + +HBase depends on atomic folder rename. Hence, a configuration setting was +introduced called fs.azure.atomic.rename.dir that allows you to specify a +comma-separated list of directories to receive special treatment so that +folder rename is made atomic. The default value of this setting is just /hbase. +Redo will be applied to finish a folder rename that fails. A file +<folderName>-renamePending.json may appear temporarily and is the record of +the intention of the rename operation, to allow redo in event of a failure. ============= Findbugs http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml index cc63141..cde1734 100644 --- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml +++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml @@ -15,5 +15,36 @@ limitations under the License. --> <FindBugsFilter> + <!-- It is okay to skip up to end of file. No need to check return value. --> + <Match> + <Class name="org.apache.hadoop.fs.azure.AzureNativeFileSystemStore" /> + <Method name="retrieve" /> + <Bug pattern="SR_NOT_CHECKED" /> + <Priority value="2" /> + </Match> + + <!-- Returning fully loaded array to iterate through is a convenience + and helps performance. --> + <Match> + <Class name="org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending" /> + <Method name="getFiles" /> + <Bug pattern="EI_EXPOSE_REP" /> + <Priority value="2" /> + </Match> + + <!-- Need to start keep-alive thread for SelfRenewingLease in constructor. --> + <Match> + <Class name="org.apache.hadoop.fs.azure.SelfRenewingLease" /> + <Bug pattern="SC_START_IN_CTOR" /> + <Priority value="2" /> + </Match> + <!-- Using a key set iterator is fine because this is not a performance-critical + method. --> + <Match> + <Class name="org.apache.hadoop.fs.azure.PageBlobOutputStream" /> + <Method name="logAllStackTraces" /> + <Bug pattern="WMI_WRONG_MAP_ITERATOR" /> + <Priority value="2" /> + </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/config/checkstyle.xml ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml index 3bfc23d..9df4f78 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle.xml @@ -108,7 +108,10 @@ <property name="max" value="3000"/> </module> - <module name="ParameterNumber"/> + <module name="ParameterNumber"> + <property name="max" value="8"/> + </module> + <!-- Checks for whitespace --> @@ -152,7 +155,7 @@ <module name="IllegalInstantiation"/> <module name="InnerAssignment"/> <module name="MagicNumber"> - <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 1000"/> + <property name="ignoreNumbers" value="-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 255, 1000, 1024"/> </module> <module name="MissingSwitchDefault"/> <module name="RedundantThrows"/> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java new file mode 100644 index 0000000..4c52ef0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureLinkedStack.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +/** + * A simple generic stack implementation using linked lists. The stack + * implementation has five main operations: + * <ul> + * <li>push -- adds an element to the top of the stack</li> + * <li>pop -- removes an element from the top of the stack and returns a + * reference to it</li> + * <li>peek -- peek returns an element from the top of the stack without + * removing it</li> + * <li>isEmpty -- tests whether the stack is empty</li> + * <li>size -- returns the size of the stack</li> + * <li>toString -- returns a string representation of the stack.</li> + * </ul> + */ + +public class AzureLinkedStack<E> { + /* + * Linked node for Azure stack collection. + */ + private static class AzureLinkedNode<E> { + private E element; // Linked element on the list. + private AzureLinkedNode<E> next;// Reference to the next linked element on + // list. + + /* + * The constructor builds the linked node with no successor + * + * @param element : The value of the element to be stored with this node. + */ + private AzureLinkedNode(E anElement) { + element = anElement; + next = null; + } + + /* + * Constructor builds a linked node with a specified successor. The + * successor may be null. + * + * @param anElement : new element to be created. + * + * @param nextElement: successor to the new element. + */ + private AzureLinkedNode(E anElement, AzureLinkedNode<E> nextElement) { + element = anElement; + next = nextElement; + } + + /* + * Get the element stored in the linked node. + * + * @return E : element stored in linked node. + */ + private E getElement() { + return element; + } + + /* + * Get the successor node to the element. + * + * @return E : reference to the succeeding node on the list. + */ + private AzureLinkedNode<E> getNext() { + return next; + } + } + + private int count; // The number of elements stored on the stack. + private AzureLinkedNode<E> top; // Top of the stack. + + /* + * Constructor creating an empty stack. + */ + public AzureLinkedStack() { + // Simply initialize the member variables. + // + count = 0; + top = null; + } + + /* + * Adds an element to the top of the stack. + * + * @param element : element pushed to the top of the stack. + */ + public void push(E element) { + // Create a new node containing a reference to be placed on the stack. + // Set the next reference to the new node to point to the current top + // of the stack. Set the top reference to point to the new node. Finally + // increment the count of nodes on the stack. + // + AzureLinkedNode<E> newNode = new AzureLinkedNode<E>(element, top); + top = newNode; + count++; + } + + /* + * Removes the element at the top of the stack and returns a reference to it. + * + * @return E : element popped from the top of the stack. + * + * @throws Exception on pop from an empty stack. + */ + public E pop() throws Exception { + // Make sure the stack is not empty. If it is empty, throw a StackEmpty + // exception. + // + if (isEmpty()) { + throw new Exception("AzureStackEmpty"); + } + + // Set a temporary reference equal to the element at the top of the stack, + // decrement the count of elements and return reference to the temporary. + // + E element = top.getElement(); + top = top.getNext(); + count--; + + // Return the reference to the element that was at the top of the stack. + // + return element; + } + + /* + * Return the top element of the stack without removing it. + * + * @return E + * + * @throws Exception on peek into an empty stack. + */ + public E peek() throws Exception { + // Make sure the stack is not empty. If it is empty, throw a StackEmpty + // exception. + // + if (isEmpty()) { + throw new Exception("AzureStackEmpty"); + } + + // Set a temporary reference equal to the element at the top of the stack + // and return the temporary. + // + E element = top.getElement(); + return element; + } + + /* + * Determines whether the stack is empty + * + * @return boolean true if the stack is empty and false otherwise. + */ + public boolean isEmpty() { + if (0 == size()) { + // Zero-sized stack so the stack is empty. + // + return true; + } + + // The stack is not empty. + // + return false; + } + + /* + * Determines the size of the stack + * + * @return int: Count of the number of elements in the stack. + */ + public int size() { + return count; + } + + /* + * Returns a string representation of the stack. + * + * @return String String representation of all elements in the stack. + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + AzureLinkedNode<E> current = top; + for (int i = 0; i < size(); i++) { + E element = current.getElement(); + sb.append(element.toString()); + current = current.getNext(); + + // Insert commas between strings except after the last string. + // + if (size() - 1 > i) { + sb.append(", "); + } + } + + // Return the string. + // + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index 5afbbbe..c091767 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -20,31 +20,42 @@ package org.apache.hadoop.fs.azure; import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER; import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URLDecoder; +import java.net.URLEncoder; import java.security.InvalidKeyException; import java.util.ArrayList; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; +import java.util.Locale; import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper; +import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; +import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; +import org.apache.hadoop.fs.azure.StorageInterfaceImpl.CloudPageBlobWrapperImpl; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater; import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater; @@ -72,7 +83,6 @@ import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption; import com.microsoft.windowsazure.storage.blob.ListBlobItem; import com.microsoft.windowsazure.storage.core.Utility; - /** * Core implementation of Windows Azure Filesystem for Hadoop. * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage @@ -140,6 +150,33 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { static final String LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "hdi_tmpupload"; static final String OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY = "asv_tmpupload"; + /** + * Configuration key to indicate the set of directories in WASB where we + * should store files as page blobs instead of block blobs. + * + * Entries should be plain directory names (i.e. not URIs) with no leading or + * trailing slashes. Delimit the entries with commas. + */ + public static final String KEY_PAGE_BLOB_DIRECTORIES = + "fs.azure.page.blob.dir"; + /** + * The set of directories where we should store files as page blobs. + */ + private Set<String> pageBlobDirs; + + /** + * Configuration key to indicate the set of directories in WASB where + * we should do atomic folder rename synchronized with createNonRecursive. + */ + public static final String KEY_ATOMIC_RENAME_DIRECTORIES = + "fs.azure.atomic.rename.dir"; + + /** + * The set of directories where we should apply atomic folder rename + * synchronized with createNonRecursive. + */ + private Set<String> atomicRenameDirs; + private static final String HTTP_SCHEME = "http"; private static final String HTTPS_SCHEME = "https"; private static final String WASB_AUTHORITY_DELIMITER = "@"; @@ -148,6 +185,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final int DEFAULT_CONCURRENT_WRITES = 8; // Concurrent reads reads of data written out of band are disable by default. + // private static final boolean DEFAULT_READ_TOLERATE_CONCURRENT_APPEND = false; // Default block sizes @@ -155,6 +193,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024; // Retry parameter defaults. + // + private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s @@ -169,6 +209,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90; + /** * MEMBER VARIABLES */ @@ -181,7 +222,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private boolean connectingUsingSAS = false; private AzureFileSystemInstrumentation instrumentation; private BandwidthGaugeUpdater bandwidthGaugeUpdater; - private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); + private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); private boolean suppressRetryPolicy = false; private boolean canCreateOrModifyContainer = false; @@ -317,7 +358,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { public BandwidthGaugeUpdater getBandwidthGaugeUpdater() { return bandwidthGaugeUpdater; } - + /** * Check if concurrent reads and writes on the same blob are allowed. * @@ -333,19 +374,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * session with an Azure session. It parses the scheme to ensure it matches * the storage protocol supported by this file system. * - * @param uri - * - URI for target storage blob. - * @param conf - * - reference to configuration object. + * @param uri - URI for target storage blob. + * @param conf - reference to configuration object. + * @param instrumentation - the metrics source that will keep track of operations here. * - * @throws IllegalArgumentException - * if URI or job object is null, or invalid scheme. + * @throws IllegalArgumentException if URI or job object is null, or invalid scheme. */ @Override - public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException { - - if (null == this.storageInteractionLayer) { - this.storageInteractionLayer = new StorageInterfaceImpl(); + public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) + throws IllegalArgumentException, AzureException, IOException { + + if (null == instrumentation) { + throw new IllegalArgumentException("Null instrumentation"); } this.instrumentation = instrumentation; @@ -377,6 +417,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Start an Azure storage session. // createAzureStorageSession(); + + // Extract the directories that should contain page blobs + pageBlobDirs = getDirectorySet(KEY_PAGE_BLOB_DIRECTORIES); + LOG.debug("Page blob directories: " + setToString(pageBlobDirs)); + + // Extract directories that should have atomic rename applied. + atomicRenameDirs = getDirectorySet(KEY_ATOMIC_RENAME_DIRECTORIES); + String hbaseRoot; + try { + + // Add to this the hbase root directory, or /hbase is that is not set. + hbaseRoot = verifyAndConvertToStandardFormat( + sessionConfiguration.get("hbase.rootdir", "hbase")); + atomicRenameDirs.add(hbaseRoot); + } catch (URISyntaxException e) { + LOG.warn("Unable to initialize HBase root as an atomic rename directory."); + } + LOG.debug("Atomic rename directories: " + setToString(atomicRenameDirs)); + } + + /** + * Helper to format a string for log output from Set<String> + */ + private String setToString(Set<String> set) { + StringBuilder sb = new StringBuilder(); + int i = 1; + for (String s : set) { + sb.append("/" + s); + if (i != set.size()) { + sb.append(", "); + } + i++; + } + return sb.toString(); } /** @@ -400,8 +474,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { "Expected URI with a valid authority"); } - // Check if authority container the delimiter separating the account name - // from the + // Check if authority container the delimiter separating the account name from the // the container. // if (!authority.contains(WASB_AUTHORITY_DELIMITER)) { @@ -455,8 +528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // The URI has a valid authority. Extract the container name. It is the // second component of the WASB URI authority. if (!authority.contains(WASB_AUTHORITY_DELIMITER)) { - // The authority does not have a container name. Use the default container - // by + // The authority does not have a container name. Use the default container by // setting the container name to the default Azure root container. // return AZURE_ROOT_CONTAINER; @@ -491,9 +563,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private String getHTTPScheme() { String sessionScheme = sessionUri.getScheme(); // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme. - if (sessionScheme != null - && (sessionScheme.equalsIgnoreCase("asvs") || sessionScheme - .equalsIgnoreCase("wasbs"))) { + if (sessionScheme != null && + (sessionScheme.equalsIgnoreCase("asvs") || + sessionScheme.equalsIgnoreCase("wasbs"))) { return HTTPS_SCHEME; } else { // At this point the scheme should be either null or asv or wasb. @@ -565,20 +637,22 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { Math.min(cpuCores, DEFAULT_CONCURRENT_WRITES)); // Set up the exponential retry policy. - minBackoff = sessionConfiguration.getInt(KEY_MIN_BACKOFF_INTERVAL, - DEFAULT_MIN_BACKOFF_INTERVAL); + // + minBackoff = sessionConfiguration.getInt( + KEY_MIN_BACKOFF_INTERVAL, DEFAULT_MIN_BACKOFF_INTERVAL); + + maxBackoff = sessionConfiguration.getInt( + KEY_MAX_BACKOFF_INTERVAL, DEFAULT_MAX_BACKOFF_INTERVAL); - maxBackoff = sessionConfiguration.getInt(KEY_MAX_BACKOFF_INTERVAL, - DEFAULT_MAX_BACKOFF_INTERVAL); + deltaBackoff = sessionConfiguration.getInt( + KEY_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL); - deltaBackoff = sessionConfiguration.getInt(KEY_BACKOFF_INTERVAL, - DEFAULT_BACKOFF_INTERVAL); + maxRetries = sessionConfiguration.getInt( + KEY_MAX_IO_RETRIES, DEFAULT_MAX_RETRY_ATTEMPTS); - maxRetries = sessionConfiguration.getInt(KEY_MAX_IO_RETRIES, - DEFAULT_MAX_RETRY_ATTEMPTS); + storageInteractionLayer.setRetryPolicyFactory( + new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries)); - storageInteractionLayer.setRetryPolicyFactory(new RetryExponentialRetry( - minBackoff, deltaBackoff, maxBackoff, maxRetries)); // read the self-throttling config. selfThrottlingEnabled = sessionConfiguration.getBoolean( @@ -659,13 +733,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { StorageCredentials credentials, String containerName) throws URISyntaxException, StorageException, AzureException { + URI blobEndPoint; if (isStorageEmulatorAccount(accountName)) { isStorageEmulator = true; - CloudStorageAccount account = CloudStorageAccount - .getDevelopmentStorageAccount(); + CloudStorageAccount account = + CloudStorageAccount.getDevelopmentStorageAccount(); storageInteractionLayer.createBlobClient(account); } else { - URI blobEndPoint = new URI(getHTTPScheme() + "://" + accountName); + blobEndPoint = new URI(getHTTPScheme() + "://" + + accountName); storageInteractionLayer.createBlobClient(blobEndPoint, credentials); } suppressRetryPolicyInClientIfNeeded(); @@ -753,7 +829,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @throws AzureException * @throws IOException */ - private void createAzureStorageSession() throws AzureException { + private void createAzureStorageSession () + throws AzureException, IOException { // Make sure this object was properly initialized with references to // the sessionUri and sessionConfiguration. @@ -886,6 +963,106 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } /** + * Trims a suffix/prefix from the given string. For example if + * s is given as "/xy" and toTrim is "/", this method returns "xy" + */ + private static String trim(String s, String toTrim) { + return StringUtils.removeEnd(StringUtils.removeStart(s, toTrim), + toTrim); + } + + /** + * Checks if the given rawDir belongs to this account/container, and + * if so returns the canonicalized path for it. Otherwise return null. + */ + private String verifyAndConvertToStandardFormat(String rawDir) throws URISyntaxException { + URI asUri = new URI(rawDir); + if (asUri.getAuthority() == null + || asUri.getAuthority().toLowerCase(Locale.US).equalsIgnoreCase( + sessionUri.getAuthority().toLowerCase(Locale.US))) { + // Applies to me. + return trim(asUri.getPath(), "/"); + } else { + // Doen't apply to me. + return null; + } + } + + /** + * Take a comma-separated list of directories from a configuration variable + * and transform it to a set of directories. + */ + private Set<String> getDirectorySet(final String configVar) + throws AzureException { + String[] rawDirs = sessionConfiguration.getStrings(configVar, new String[0]); + Set<String> directorySet = new HashSet<String>(); + for (String currentDir : rawDirs) { + String myDir; + try { + myDir = verifyAndConvertToStandardFormat(currentDir); + } catch (URISyntaxException ex) { + throw new AzureException(String.format( + "The directory %s specified in the configuration entry %s is not" + + " a valid URI.", + currentDir, configVar)); + } + if (myDir != null) { + directorySet.add(myDir); + } + } + return directorySet; + } + + /** + * Checks if the given key in Azure Storage should be stored as a page + * blob instead of block blob. + * @throws URISyntaxException + */ + public boolean isPageBlobKey(String key) { + return isKeyForDirectorySet(key, pageBlobDirs); + } + + /** + * Checks if the given key in Azure storage should have synchronized + * atomic folder rename createNonRecursive implemented. + */ + @Override + public boolean isAtomicRenameKey(String key) { + return isKeyForDirectorySet(key, atomicRenameDirs); + } + + public boolean isKeyForDirectorySet(String key, Set<String> dirSet) { + String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString(); + for (String dir : dirSet) { + if (dir.isEmpty() || + key.startsWith(dir + "/")) { + return true; + } + + // Allow for blob directories with paths relative to the default file + // system. + // + try { + URI uriPageBlobDir = new URI (dir); + if (null == uriPageBlobDir.getAuthority()) { + // Concatenate the default file system prefix with the relative + // page blob directory path. + // + if (key.startsWith(trim(defaultFS, "/") + "/" + dir + "/")){ + return true; + } + } + } catch (URISyntaxException e) { + LOG.info(String.format( + "URI syntax error creating URI for %s", dir)); + } + } + return false; + } + + + + /** * This should be called from any method that does any modifications to the * underlying container: it makes sure to put the WASB current version in the * container's metadata if it's not already there. @@ -1032,15 +1209,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private BlobRequestOptions getDownloadOptions() { BlobRequestOptions options = new BlobRequestOptions(); - options.setRetryPolicyFactory(new RetryExponentialRetry(minBackoff, - deltaBackoff, maxBackoff, maxRetries)); + options.setRetryPolicyFactory( + new RetryExponentialRetry(minBackoff, deltaBackoff, maxBackoff, maxRetries)); options.setUseTransactionalContentMD5(getUseTransactionalContentMD5()); return options; } @Override - public DataOutputStream storefile(String key, - PermissionStatus permissionStatus) throws AzureException { + public DataOutputStream storefile(String key, PermissionStatus permissionStatus) + throws AzureException { try { // Check if a session exists, if not create a session with the @@ -1066,19 +1243,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { checkContainer(ContainerAccessType.PureWrite); /** - * Note: Windows Azure Blob Storage does not allow the creation of - * arbitrary directory paths under the default $root directory. This is by - * design to eliminate ambiguity in specifying a implicit blob address. A - * blob in the $root container cannot include a / in its name and must be - * careful not to include a trailing '/' when referencing blobs in the - * $root container. A '/; in the $root container permits ambiguous blob - * names as in the following example involving two containers $root and - * mycontainer: http://myaccount.blob.core.windows.net/$root - * http://myaccount.blob.core.windows.net/mycontainer If the URL - * "mycontainer/somefile.txt were allowed in $root then the URL: - * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt could - * mean either: (1) container=mycontainer; blob=myblob.txt (2) - * container=$root; blob=mycontainer/myblob.txt + * Note: Windows Azure Blob Storage does not allow the creation of arbitrary directory + * paths under the default $root directory. This is by design to eliminate + * ambiguity in specifying a implicit blob address. A blob in the $root conatiner + * cannot include a / in its name and must be careful not to include a trailing + * '/' when referencing blobs in the $root container. + * A '/; in the $root container permits ambiguous blob names as in the following + * example involving two containers $root and mycontainer: + * http://myaccount.blob.core.windows.net/$root + * http://myaccount.blob.core.windows.net/mycontainer + * If the URL "mycontainer/somefile.txt were allowed in $root then the URL: + * http://myaccount.blob.core.windows.net/mycontainer/myblob.txt + * could mean either: + * (1) container=mycontainer; blob=myblob.txt + * (2) container=$root; blob=mycontainer/myblob.txt * * To avoid this type of ambiguity the Azure blob storage prevents * arbitrary path under $root. For a simple and more consistent user @@ -1097,17 +1275,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { throw new AzureException(errMsg); } - // Get the block blob reference from the store's container and + // Get the blob reference from the store's container and // return it. - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); storePermissionStatus(blob, permissionStatus); // Create the output stream for the Azure blob. - OutputStream outputStream = blob.openOutputStream(getUploadOptions(), - getInstrumentedContext()); - - // Return to caller with DataOutput stream. - DataOutputStream dataOutStream = new DataOutputStream(outputStream); + // + OutputStream outputStream = openOutputStream(blob); + DataOutputStream dataOutStream = new SyncableDataOutputStream(outputStream); return dataOutStream; } catch (Exception e) { // Caught exception while attempting to open the blob output stream. @@ -1117,6 +1293,40 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } /** + * Opens a new output stream to the given blob (page or block blob) + * to populate it from scratch with data. + */ + private OutputStream openOutputStream(final CloudBlobWrapper blob) + throws StorageException { + if (blob instanceof CloudPageBlobWrapperImpl){ + return new PageBlobOutputStream( + (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration); + } else { + + // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path) + // MockCloudBlockBlobWrapper. + return ((CloudBlockBlobWrapper) blob).openOutputStream(getUploadOptions(), + getInstrumentedContext()); + } + } + + /** + * Opens a new input stream for the given blob (page or block blob) + * to read its data. + */ + private InputStream openInputStream(CloudBlobWrapper blob) + throws StorageException, IOException { + if (blob instanceof CloudBlockBlobWrapper) { + return blob.openInputStream(getDownloadOptions(), + getInstrumentedContext(isConcurrentOOBAppendAllowed())); + } else { + return new PageBlobInputStream( + (CloudPageBlobWrapper) blob, getInstrumentedContext( + isConcurrentOOBAppendAllowed())); + } + } + + /** * Default permission to use when no permission metadata is found. * * @return The default permission to use. @@ -1125,7 +1335,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { return new PermissionStatus("", "", FsPermission.getDefault()); } - private static void storeMetadataAttribute(CloudBlockBlobWrapper blob, + private static void storeMetadataAttribute(CloudBlobWrapper blob, String key, String value) { HashMap<String, String> metadata = blob.getMetadata(); if (null == metadata) { @@ -1135,7 +1345,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { blob.setMetadata(metadata); } - private static String getMetadataAttribute(CloudBlockBlobWrapper blob, + private static String getMetadataAttribute(CloudBlobWrapper blob, String... keyAlternatives) { HashMap<String, String> metadata = blob.getMetadata(); if (null == metadata) { @@ -1149,7 +1359,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { return null; } - private static void removeMetadataAttribute(CloudBlockBlobWrapper blob, + private static void removeMetadataAttribute(CloudBlobWrapper blob, String key) { HashMap<String, String> metadata = blob.getMetadata(); if (metadata != null) { @@ -1158,7 +1368,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } } - private void storePermissionStatus(CloudBlockBlobWrapper blob, + private static void storePermissionStatus(CloudBlobWrapper blob, PermissionStatus permissionStatus) { storeMetadataAttribute(blob, PERMISSION_METADATA_KEY, PERMISSION_JSON_SERIALIZER.toJSON(permissionStatus)); @@ -1166,39 +1376,55 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { removeMetadataAttribute(blob, OLD_PERMISSION_METADATA_KEY); } - private PermissionStatus getPermissionStatus(CloudBlockBlobWrapper blob) { + private PermissionStatus getPermissionStatus(CloudBlobWrapper blob) { String permissionMetadataValue = getMetadataAttribute(blob, PERMISSION_METADATA_KEY, OLD_PERMISSION_METADATA_KEY); if (permissionMetadataValue != null) { - return PermissionStatusJsonSerializer - .fromJSONString(permissionMetadataValue); + return PermissionStatusJsonSerializer.fromJSONString( + permissionMetadataValue); } else { return defaultPermissionNoBlobMetadata(); } } - private static void storeFolderAttribute(CloudBlockBlobWrapper blob) { + private static void storeFolderAttribute(CloudBlobWrapper blob) { storeMetadataAttribute(blob, IS_FOLDER_METADATA_KEY, "true"); // Remove the old metadata key if present removeMetadataAttribute(blob, OLD_IS_FOLDER_METADATA_KEY); } - private static void storeLinkAttribute(CloudBlockBlobWrapper blob, - String linkTarget) { - storeMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY, - linkTarget); + private static void storeLinkAttribute(CloudBlobWrapper blob, + String linkTarget) throws UnsupportedEncodingException { + // We have to URL encode the link attribute as the link URI could + // have URI special characters which unless encoded will result + // in 403 errors from the server. This is due to metadata properties + // being sent in the HTTP header of the request which is in turn used + // on the server side to authorize the request. + String encodedLinkTarget = null; + if (linkTarget != null) { + encodedLinkTarget = URLEncoder.encode(linkTarget, "UTF-8"); + } + storeMetadataAttribute(blob, + LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY, + encodedLinkTarget); // Remove the old metadata key if present removeMetadataAttribute(blob, OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY); } - private static String getLinkAttributeValue(CloudBlockBlobWrapper blob) { - return getMetadataAttribute(blob, + private static String getLinkAttributeValue(CloudBlobWrapper blob) + throws UnsupportedEncodingException { + String encodedLinkTarget = getMetadataAttribute(blob, LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY, OLD_LINK_BACK_TO_UPLOAD_IN_PROGRESS_METADATA_KEY); + String linkTarget = null; + if (encodedLinkTarget != null) { + linkTarget = URLDecoder.decode(encodedLinkTarget, "UTF-8"); + } + return linkTarget; } - private static boolean retrieveFolderAttribute(CloudBlockBlobWrapper blob) { + private static boolean retrieveFolderAttribute(CloudBlobWrapper blob) { HashMap<String, String> metadata = blob.getMetadata(); return null != metadata && (metadata.containsKey(IS_FOLDER_METADATA_KEY) || metadata @@ -1255,11 +1481,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { try { checkContainer(ContainerAccessType.PureWrite); - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); storePermissionStatus(blob, permissionStatus); storeFolderAttribute(blob); - blob.upload(new ByteArrayInputStream(new byte[0]), - getInstrumentedContext()); + openOutputStream(blob).close(); } catch (Exception e) { // Caught exception while attempting upload. Re-throw as an Azure // storage exception. @@ -1293,11 +1518,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { try { checkContainer(ContainerAccessType.PureWrite); - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); storePermissionStatus(blob, permissionStatus); storeLinkAttribute(blob, tempBlobKey); - blob.upload(new ByteArrayInputStream(new byte[0]), - getInstrumentedContext()); + openOutputStream(blob).close(); } catch (Exception e) { // Caught exception while attempting upload. Re-throw as an Azure // storage exception. @@ -1322,7 +1546,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { try { checkContainer(ContainerAccessType.PureRead); - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); blob.downloadAttributes(getInstrumentedContext()); return getLinkAttributeValue(blob); } catch (Exception e) { @@ -1366,10 +1590,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata) throws StorageException, URISyntaxException { return rootDirectory.listBlobs( + null, false, + includeMetadata ? + EnumSet.of(BlobListingDetails.METADATA) : + EnumSet.noneOf(BlobListingDetails.class), null, - false, - includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet - .noneOf(BlobListingDetails.class), null, getInstrumentedContext()); + getInstrumentedContext()); } /** @@ -1392,11 +1618,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean includeMetadata) throws StorageException, URISyntaxException { - return rootDirectory.listBlobs( - aPrefix, + Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix, false, - includeMetadata ? EnumSet.of(BlobListingDetails.METADATA) : EnumSet - .noneOf(BlobListingDetails.class), null, getInstrumentedContext()); + includeMetadata ? + EnumSet.of(BlobListingDetails.METADATA) : + EnumSet.noneOf(BlobListingDetails.class), + null, + getInstrumentedContext()); + return list; } /** @@ -1423,15 +1652,17 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @throws URISyntaxException * */ - private Iterable<ListBlobItem> listRootBlobs(String aPrefix, - boolean useFlatBlobListing, EnumSet<BlobListingDetails> listingDetails, - BlobRequestOptions options, OperationContext opContext) - throws StorageException, URISyntaxException { + private Iterable<ListBlobItem> listRootBlobs(String aPrefix, boolean useFlatBlobListing, + EnumSet<BlobListingDetails> listingDetails, BlobRequestOptions options, + OperationContext opContext) throws StorageException, URISyntaxException { - CloudBlobDirectoryWrapper directory = this.container - .getDirectoryReference(aPrefix); - return directory.listBlobs(null, useFlatBlobListing, listingDetails, - options, opContext); + CloudBlobDirectoryWrapper directory = this.container.getDirectoryReference(aPrefix); + return directory.listBlobs( + null, + useFlatBlobListing, + listingDetails, + options, + opContext); } /** @@ -1447,15 +1678,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @throws URISyntaxException * */ - private CloudBlockBlobWrapper getBlobReference(String aKey) + private CloudBlobWrapper getBlobReference(String aKey) throws StorageException, URISyntaxException { - CloudBlockBlobWrapper blob = this.container.getBlockBlobReference(aKey); - + CloudBlobWrapper blob = null; + if (isPageBlobKey(aKey)) { + blob = this.container.getPageBlobReference(aKey); + } else { + blob = this.container.getBlockBlobReference(aKey); blob.setStreamMinimumReadSizeInBytes(downloadBlockSizeBytes); blob.setWriteBlockSizeInBytes(uploadBlockSizeBytes); + } - // Return with block blob. return blob; } @@ -1492,7 +1726,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * * @returns normKey */ - private String normalizeKey(CloudBlockBlobWrapper blob) { + private String normalizeKey(CloudBlobWrapper blob) { return normalizeKey(blob.getUri()); } @@ -1552,20 +1786,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { instrumentation, bandwidthGaugeUpdater); - // Bind operation context to receive send request callbacks on this - // operation. - // If reads concurrent to OOB writes are allowed, the interception will - // reset the conditional header on all Azure blob storage read requests. + // Bind operation context to receive send request callbacks on this operation. + // If reads concurrent to OOB writes are allowed, the interception will reset + // the conditional header on all Azure blob storage read requests. if (bindConcurrentOOBIo) { SendRequestIntercept.bind(storageInteractionLayer.getCredentials(), operationContext, true); } if (testHookOperationContext != null) { - operationContext = testHookOperationContext - .modifyOperationContext(operationContext); + operationContext = + testHookOperationContext.modifyOperationContext(operationContext); } - + ErrorMetricUpdater.hook(operationContext, instrumentation); // Return the operation context. @@ -1605,7 +1838,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { BlobMaterialization.Implicit); } - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); // Download attributes and return file metadata only if the blob // exists. @@ -1634,7 +1867,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { return new FileMetadata( key, // Always return denormalized key with metadata. - properties.getLength(), properties.getLastModified().getTime(), + getDataLength(blob, properties), + properties.getLastModified().getTime(), getPermissionStatus(blob)); } } @@ -1642,17 +1876,23 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // There is no file with that key name, but maybe it is a folder. // Query the underlying folder/container to list the blobs stored // there under that key. - Iterable<ListBlobItem> objects = listRootBlobs(key, true, - EnumSet.of(BlobListingDetails.METADATA), null, + // + Iterable<ListBlobItem> objects = + listRootBlobs( + key, + true, + EnumSet.of(BlobListingDetails.METADATA), + null, getInstrumentedContext()); // Check if the directory/container has the blob items. for (ListBlobItem blobItem : objects) { - if (blobItem instanceof CloudBlockBlobWrapper) { + if (blobItem instanceof CloudBlockBlobWrapper + || blobItem instanceof CloudPageBlobWrapper) { LOG.debug("Found blob as a directory-using this file under it to infer its properties " + blobItem.getUri()); - blob = (CloudBlockBlobWrapper) blobItem; + blob = (CloudBlobWrapper) blobItem; // The key specifies a directory. Create a FileMetadata object which // specifies as such. BlobProperties properties = blob.getProperties(); @@ -1672,10 +1912,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } @Override - public DataInputStream retrieve(String key) throws AzureException { - InputStream inStream = null; - BufferedInputStream inBufStream = null; - try { + public DataInputStream retrieve(String key) throws AzureException, IOException { try { // Check if a session exists, if not create a session with the // Azure storage server. @@ -1688,27 +1925,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { checkContainer(ContainerAccessType.PureRead); // Get blob reference and open the input buffer stream. - CloudBlockBlobWrapper blob = getBlobReference(key); - inStream = blob.openInputStream(getDownloadOptions(), - getInstrumentedContext(isConcurrentOOBAppendAllowed())); - - inBufStream = new BufferedInputStream(inStream); + CloudBlobWrapper blob = getBlobReference(key); + BufferedInputStream inBufStream = new BufferedInputStream( + openInputStream(blob)); // Return a data input stream. DataInputStream inDataStream = new DataInputStream(inBufStream); return inDataStream; - } - catch (Exception e){ - // close the streams on error. - // We use nested try-catch as stream.close() can throw IOException. - if(inBufStream != null){ - inBufStream.close(); - } - if(inStream != null){ - inStream.close(); - } - throw e; - } } catch (Exception e) { // Re-throw as an Azure storage exception. throw new AzureException(e); @@ -1717,11 +1940,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public DataInputStream retrieve(String key, long startByteOffset) - throws AzureException { - - InputStream in = null; - DataInputStream inDataStream = null; - try { + throws AzureException, IOException { try { // Check if a session exists, if not create a session with the // Azure storage server. @@ -1734,31 +1953,20 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { checkContainer(ContainerAccessType.PureRead); // Get blob reference and open the input buffer stream. - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); // Open input stream and seek to the start offset. - in = blob.openInputStream(getDownloadOptions(), - getInstrumentedContext(isConcurrentOOBAppendAllowed())); + InputStream in = blob.openInputStream( + getDownloadOptions(), getInstrumentedContext(isConcurrentOOBAppendAllowed())); // Create a data input stream. - inDataStream = new DataInputStream(in); - long skippedBytes = inDataStream.skip(startByteOffset); - if (skippedBytes != startByteOffset) { - throw new IOException("Couldn't skip the requested number of bytes"); - } + DataInputStream inDataStream = new DataInputStream(in); + + // Skip bytes and ignore return value. This is okay + // because if you try to skip too far you will be positioned + // at the end and reads will not return data. + inDataStream.skip(startByteOffset); return inDataStream; - } - catch (Exception e){ - // close the streams on error. - // We use nested try-catch as stream.close() can throw IOException. - if(inDataStream != null){ - inDataStream.close(); - } - if(in != null){ - in.close(); - } - throw e; - } } catch (Exception e) { // Re-throw as an Azure storage exception. throw new AzureException(e); @@ -1825,13 +2033,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { for (ListBlobItem blobItem : objects) { // Check that the maximum listing count is not exhausted. // - if (0 < maxListingCount && fileMetadata.size() >= maxListingCount) { + if (0 < maxListingCount + && fileMetadata.size() >= maxListingCount) { break; } - if (blobItem instanceof CloudBlockBlobWrapper) { + if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) { String blobKey = null; - CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem; + CloudBlobWrapper blob = (CloudBlobWrapper) blobItem; BlobProperties properties = blob.getProperties(); // Determine format of the blob name depending on whether an absolute @@ -1840,11 +2049,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { FileMetadata metadata; if (retrieveFolderAttribute(blob)) { - metadata = new FileMetadata(blobKey, properties.getLastModified() - .getTime(), getPermissionStatus(blob), + metadata = new FileMetadata(blobKey, + properties.getLastModified().getTime(), + getPermissionStatus(blob), BlobMaterialization.Explicit); } else { - metadata = new FileMetadata(blobKey, properties.getLength(), + metadata = new FileMetadata( + blobKey, + getDataLength(blob, properties), properties.getLastModified().getTime(), getPermissionStatus(blob)); } @@ -1890,9 +2102,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } // Note: Original code indicated that this may be a hack. priorLastKey = null; - return new PartialListing(priorLastKey, + PartialListing listing = new PartialListing(priorLastKey, fileMetadata.toArray(new FileMetadata[] {}), - 0 == fileMetadata.size() ? new String[] {} : new String[] { prefix }); + 0 == fileMetadata.size() ? new String[] {} + : new String[] { prefix }); + return listing; } catch (Exception e) { // Re-throw as an Azure storage exception. // @@ -1919,7 +2133,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { final int maxListingDepth) throws Exception { // Push the blob directory onto the stack. - LinkedList<Iterator<ListBlobItem>> dirIteratorStack = new LinkedList<Iterator<ListBlobItem>>(); + // + AzureLinkedStack<Iterator<ListBlobItem>> dirIteratorStack = + new AzureLinkedStack<Iterator<ListBlobItem>>(); Iterable<ListBlobItem> blobItems = aCloudBlobDirectory.listBlobs(null, false, EnumSet.of(BlobListingDetails.METADATA), null, @@ -1958,9 +2174,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Add the file metadata to the list if this is not a blob // directory item. - if (blobItem instanceof CloudBlockBlobWrapper) { + // + if (blobItem instanceof CloudBlockBlobWrapper || blobItem instanceof CloudPageBlobWrapper) { String blobKey = null; - CloudBlockBlobWrapper blob = (CloudBlockBlobWrapper) blobItem; + CloudBlobWrapper blob = (CloudBlobWrapper) blobItem; BlobProperties properties = blob.getProperties(); // Determine format of the blob name depending on whether an absolute @@ -1969,11 +2186,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { FileMetadata metadata; if (retrieveFolderAttribute(blob)) { - metadata = new FileMetadata(blobKey, properties.getLastModified() - .getTime(), getPermissionStatus(blob), + metadata = new FileMetadata(blobKey, + properties.getLastModified().getTime(), + getPermissionStatus(blob), BlobMaterialization.Explicit); } else { - metadata = new FileMetadata(blobKey, properties.getLength(), + metadata = new FileMetadata( + blobKey, + getDataLength(blob, properties), properties.getLastModified().getTime(), getPermissionStatus(blob)); } @@ -2016,7 +2236,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Note: Something smarter should be done about permissions. Maybe // inherit the permissions of the first non-directory blob. // Also, getting a proper value for last-modified is tricky. - FileMetadata directoryMetadata = new FileMetadata(dirKey, 0, + // + FileMetadata directoryMetadata = new FileMetadata(dirKey, + 0, defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit); @@ -2050,26 +2272,48 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } /** - * Deletes the given blob, taking special care that if we get a blob-not-found - * exception upon retrying the operation, we just swallow the error since what - * most probably happened is that the first operation succeeded on the server. - * - * @param blob - * The blob to delete. + * Return the actual data length of the blob with the specified properties. + * If it is a page blob, you can't rely on the length from the properties + * argument and you must get it from the file. Otherwise, you can. + */ + private long getDataLength(CloudBlobWrapper blob, BlobProperties properties) + throws AzureException { + if (blob instanceof CloudPageBlobWrapper) { + try { + return PageBlobInputStream.getPageBlobSize((CloudPageBlobWrapper) blob, + getInstrumentedContext( + isConcurrentOOBAppendAllowed())); + } catch (Exception e) { + throw new AzureException( + "Unexpected exception getting page blob actual data size.", e); + } + } + return properties.getLength(); + } + + /** + * Deletes the given blob, taking special care that if we get a + * blob-not-found exception upon retrying the operation, we just + * swallow the error since what most probably happened is that + * the first operation succeeded on the server. + * @param blob The blob to delete. + * @param leaseID A string identifying the lease, or null if no + * lease is to be used. * @throws StorageException */ - private void safeDelete(CloudBlockBlobWrapper blob) throws StorageException { + private void safeDelete(CloudBlobWrapper blob, SelfRenewingLease lease) throws StorageException { OperationContext operationContext = getInstrumentedContext(); try { - blob.delete(operationContext); + blob.delete(operationContext, lease); } catch (StorageException e) { // On exception, check that if: // 1. It's a BlobNotFound exception AND // 2. It got there after one-or-more retries THEN // we swallow the exception. - if (e.getErrorCode() != null && e.getErrorCode().equals("BlobNotFound") - && operationContext.getRequestResults().size() > 1 - && operationContext.getRequestResults().get(0).getException() != null) { + if (e.getErrorCode() != null && + e.getErrorCode().equals("BlobNotFound") && + operationContext.getRequestResults().size() > 1 && + operationContext.getRequestResults().get(0).getException() != null) { if (LOG.isDebugEnabled()) { LOG.debug("Swallowing delete exception on retry: " + e.getMessage()); } @@ -2077,21 +2321,25 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } else { throw e; } + } finally { + if (lease != null) { + lease.free(); + } } } @Override - public void delete(String key) throws IOException { + public void delete(String key, SelfRenewingLease lease) throws IOException { try { if (checkContainer(ContainerAccessType.ReadThenWrite) == ContainerState.DoesntExist) { // Container doesn't exist, no need to do anything return; } - // Get the blob reference an delete it. - CloudBlockBlobWrapper blob = getBlobReference(key); + // Get the blob reference and delete it. + CloudBlobWrapper blob = getBlobReference(key); if (blob.exists(getInstrumentedContext())) { - safeDelete(blob); + safeDelete(blob, lease); } } catch (Exception e) { // Re-throw as an Azure storage exception. @@ -2100,12 +2348,27 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } @Override + public void delete(String key) throws IOException { + delete(key, null); + } + + @Override public void rename(String srcKey, String dstKey) throws IOException { + rename(srcKey, dstKey, false, null); + } + + @Override + public void rename(String srcKey, String dstKey, boolean acquireLease, + SelfRenewingLease existingLease) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Moving " + srcKey + " to " + dstKey); } + if (acquireLease && existingLease != null) { + throw new IOException("Cannot acquire new lease if one already exists."); + } + try { // Attempts rename may occur before opening any streams so first, // check if a session exists, if not create a session with the Azure @@ -2120,52 +2383,76 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { checkContainer(ContainerAccessType.ReadThenWrite); // Get the source blob and assert its existence. If the source key // needs to be normalized then normalize it. - CloudBlockBlobWrapper srcBlob = getBlobReference(srcKey); + // + CloudBlobWrapper srcBlob = getBlobReference(srcKey); if (!srcBlob.exists(getInstrumentedContext())) { - throw new AzureException("Source blob " + srcKey + " does not exist."); + throw new AzureException ("Source blob " + srcKey + + " does not exist."); + } + + /** + * Conditionally get a lease on the source blob to prevent other writers + * from changing it. This is used for correctness in HBase when log files + * are renamed. It generally should do no harm other than take a little + * more time for other rename scenarios. When the HBase master renames a + * log file folder, the lease locks out other writers. This + * prevents a region server that the master thinks is dead, but is still + * alive, from committing additional updates. This is different than + * when HBase runs on HDFS, where the region server recovers the lease + * on a log file, to gain exclusive access to it, before it splits it. + */ + SelfRenewingLease lease = null; + if (acquireLease) { + lease = srcBlob.acquireLease(); + } else if (existingLease != null) { + lease = existingLease; } // Get the destination blob. The destination key always needs to be // normalized. - CloudBlockBlobWrapper dstBlob = getBlobReference(dstKey); + // + CloudBlobWrapper dstBlob = getBlobReference(dstKey); + + // TODO: Remove at the time when we move to Azure Java SDK 1.2+. + // This is the workaround provided by Azure Java SDK team to + // mitigate the issue with un-encoded x-ms-copy-source HTTP + // request header. Azure sdk version before 1.2+ does not encode this + // header what causes all URIs that have special (category "other") + // characters in the URI not to work with startCopyFromBlob when + // specified as source (requests fail with HTTP 403). + URI srcUri = new URI(srcBlob.getUri().toASCIIString()); // Rename the source blob to the destination blob by copying it to // the destination blob then deleting it. // - dstBlob.startCopyFromBlob(srcBlob, getInstrumentedContext()); + dstBlob.startCopyFromBlob(srcUri, getInstrumentedContext()); waitForCopyToComplete(dstBlob, getInstrumentedContext()); - safeDelete(srcBlob); + safeDelete(srcBlob, lease); } catch (Exception e) { // Re-throw exception as an Azure storage exception. throw new AzureException(e); } } - private void waitForCopyToComplete(CloudBlockBlobWrapper blob, - OperationContext opContext) throws AzureException { + private void waitForCopyToComplete(CloudBlobWrapper blob, OperationContext opContext){ boolean copyInProgress = true; - int exceptionCount = 0; while (copyInProgress) { try { blob.downloadAttributes(opContext); - } catch (StorageException se) { - exceptionCount++; - if(exceptionCount > 10){ - throw new AzureException("Too many storage exceptions during waitForCopyToComplete", se); } + catch (StorageException se){ } - // test for null because mocked filesystem doesn't know about copystates - // yet. - copyInProgress = (blob.getCopyState() != null && blob.getCopyState() - .getStatus() == CopyStatus.PENDING); + // test for null because mocked filesystem doesn't know about copystates yet. + copyInProgress = (blob.getCopyState() != null && blob.getCopyState().getStatus() == CopyStatus.PENDING); if (copyInProgress) { try { Thread.sleep(1000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + } + catch (InterruptedException ie){ + //ignore } } } @@ -2179,7 +2466,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { throws AzureException { try { checkContainer(ContainerAccessType.ReadThenWrite); - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); blob.downloadAttributes(getInstrumentedContext()); storePermissionStatus(blob, newPermission); blob.uploadMetadata(getInstrumentedContext()); @@ -2220,28 +2507,51 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { } } + /** + * Get a lease on the blob identified by key. This lease will be renewed + * indefinitely by a background thread. + */ + @Override + public SelfRenewingLease acquireLease(String key) throws AzureException { + LOG.debug("acquiring lease on " + key); + try { + checkContainer(ContainerAccessType.ReadThenWrite); + CloudBlobWrapper blob = getBlobReference(key); + return blob.acquireLease(); + } + catch (Exception e) { + + // Caught exception while attempting to get lease. Re-throw as an + // Azure storage exception. + throw new AzureException(e); + } + } + @Override - public void updateFolderLastModifiedTime(String key, Date lastModified) + public void updateFolderLastModifiedTime(String key, Date lastModified, + SelfRenewingLease folderLease) throws AzureException { try { checkContainer(ContainerAccessType.ReadThenWrite); - CloudBlockBlobWrapper blob = getBlobReference(key); + CloudBlobWrapper blob = getBlobReference(key); blob.getProperties().setLastModified(lastModified); - blob.uploadProperties(getInstrumentedContext()); + blob.uploadProperties(getInstrumentedContext(), folderLease); } catch (Exception e) { - // Caught exception while attempting update the properties. Re-throw as an + + // Caught exception while attempting to update the properties. Re-throw as an // Azure storage exception. throw new AzureException(e); } } @Override - public void updateFolderLastModifiedTime(String key) throws AzureException { + public void updateFolderLastModifiedTime(String key, + SelfRenewingLease folderLease) throws AzureException { final Calendar lastModifiedCalendar = Calendar .getInstance(Utility.LOCALE_US); lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE); Date lastModified = lastModifiedCalendar.getTime(); - updateFolderLastModifiedTime(key, lastModified); + updateFolderLastModifiedTime(key, lastModified, folderLease); } @Override
