http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index dae957e..076c48a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -25,14 +25,21 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Calendar; import java.util.Date; +import java.util.EnumSet; +import java.util.Iterator; import java.util.Set; +import java.util.TimeZone; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -40,6 +47,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BufferedFSInputStream; +import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSInputStream; @@ -50,12 +58,26 @@ import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.fs.azure.AzureException; +import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobWrapper; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; + +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + import com.google.common.annotations.VisibleForTesting; -import com.microsoft.windowsazure.storage.core.Utility; +import com.microsoft.windowsazure.storage.AccessCondition; +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.CloudBlob; +import com.microsoft.windowsazure.storage.core.*; /** * <p> @@ -68,6 +90,495 @@ import com.microsoft.windowsazure.storage.core.Utility; @InterfaceAudience.Public @InterfaceStability.Stable public class NativeAzureFileSystem extends FileSystem { + private static final int USER_WX_PERMISION = 0300; + + /** + * A description of a folder rename operation, including the source and + * destination keys, and descriptions of the files in the source folder. + */ + public static class FolderRenamePending { + private SelfRenewingLease folderLease; + private String srcKey; + private String dstKey; + private FileMetadata[] fileMetadata = null; // descriptions of source files + private ArrayList<String> fileStrings = null; + private NativeAzureFileSystem fs; + private static final int MAX_RENAME_PENDING_FILE_SIZE = 10000000; + private static final int FORMATTING_BUFFER = 10000; + private boolean committed; + public static final String SUFFIX = "-RenamePending.json"; + + // Prepare in-memory information needed to do or redo a folder rename. + public FolderRenamePending(String srcKey, String dstKey, SelfRenewingLease lease, + NativeAzureFileSystem fs) throws IOException { + this.srcKey = srcKey; + this.dstKey = dstKey; + this.folderLease = lease; + this.fs = fs; + ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>(); + + // List all the files in the folder. + String priorLastKey = null; + do { + PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL, + AZURE_UNBOUNDED_DEPTH, priorLastKey); + for(FileMetadata file : listing.getFiles()) { + fileMetadataList.add(file); + } + priorLastKey = listing.getPriorLastKey(); + } while (priorLastKey != null); + fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]); + this.committed = true; + } + + // Prepare in-memory information needed to do or redo folder rename from + // a -RenamePending.json file read from storage. This constructor is to use during + // redo processing. + public FolderRenamePending(Path redoFile, NativeAzureFileSystem fs) + throws IllegalArgumentException, IOException { + + this.fs = fs; + + // open redo file + Path f = redoFile; + FSDataInputStream input = fs.open(f); + byte[] bytes = new byte[MAX_RENAME_PENDING_FILE_SIZE]; + int l = input.read(bytes); + if (l < 0) { + throw new IOException( + "Error reading pending rename file contents -- no data available"); + } + if (l == MAX_RENAME_PENDING_FILE_SIZE) { + throw new IOException( + "Error reading pending rename file contents -- " + + "maximum file size exceeded"); + } + String contents = new String(bytes, 0, l); + + // parse the JSON + ObjectMapper objMapper = new ObjectMapper(); + objMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); + JsonNode json = null; + try { + json = objMapper.readValue(contents, JsonNode.class); + this.committed = true; + } catch (JsonMappingException e) { + + // The -RedoPending.json file is corrupted, so we assume it was + // not completely written + // and the redo operation did not commit. + this.committed = false; + } catch (JsonParseException e) { + this.committed = false; + } catch (IOException e) { + this.committed = false; + } + + if (!this.committed) { + LOG.error("Deleting corruped rename pending file " + + redoFile + "\n" + contents); + + // delete the -RenamePending.json file + fs.delete(redoFile, false); + return; + } + + // initialize this object's fields + ArrayList<String> fileStrList = new ArrayList<String>(); + JsonNode oldFolderName = json.get("OldFolderName"); + JsonNode newFolderName = json.get("NewFolderName"); + if (oldFolderName == null || newFolderName == null) { + this.committed = false; + } else { + this.srcKey = oldFolderName.getTextValue(); + this.dstKey = newFolderName.getTextValue(); + if (this.srcKey == null || this.dstKey == null) { + this.committed = false; + } else { + JsonNode fileList = json.get("FileList"); + if (fileList == null) { + this.committed = false; + } else { + for (int i = 0; i < fileList.size(); i++) { + fileStrList.add(fileList.get(i).getTextValue()); + } + } + } + } + this.fileStrings = fileStrList; + } + + public FileMetadata[] getFiles() { + return fileMetadata; + } + + public SelfRenewingLease getFolderLease() { + return folderLease; + } + + /** + * Write to disk the information needed to redo folder rename, in JSON format. + * The file name will be wasb://<sourceFolderPrefix>/folderName-RenamePending.json + * The file format will be: + * { + * FormatVersion: "1.0", + * OperationTime: "<YYYY-MM-DD HH:MM:SS.MMM>", + * OldFolderName: "<key>", + * NewFolderName: "<key>", + * FileList: [ <string> , <string> , ... ] + * } + * + * Here's a sample: + * { + * FormatVersion: "1.0", + * OperationUTCTime: "2014-07-01 23:50:35.572", + * OldFolderName: "user/ehans/folderToRename", + * NewFolderName: "user/ehans/renamedFolder", + * FileList: [ + * "innerFile", + * "innerFile2" + * ] + * } + * @throws IOException + */ + public void writeFile(FileSystem fs) throws IOException { + Path path = getRenamePendingFilePath(); + if (LOG.isDebugEnabled()){ + LOG.debug("Preparing to write atomic rename state to " + path.toString()); + } + OutputStream output = null; + + String contents = makeRenamePendingFileContents(); + + // Write file. + try { + output = fs.create(path); + output.write(contents.getBytes()); + } catch (IOException e) { + throw new IOException("Unable to write RenamePending file for folder rename from " + + srcKey + " to " + dstKey, e); + } finally { + IOUtils.cleanup(LOG, output); + } + } + + /** + * Return the contents of the JSON file to represent the operations + * to be performed for a folder rename. + */ + public String makeRenamePendingFileContents() { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + String time = sdf.format(new Date()); + + // Make file list string + StringBuilder builder = new StringBuilder(); + builder.append("[\n"); + for (int i = 0; i != fileMetadata.length; i++) { + if (i > 0) { + builder.append(",\n"); + } + builder.append(" "); + String noPrefix = StringUtils.removeStart(fileMetadata[i].getKey(), srcKey + "/"); + + // Quote string file names, escaping any possible " characters or other + // necessary characters in the name. + builder.append(quote(noPrefix)); + if (builder.length() >= + MAX_RENAME_PENDING_FILE_SIZE - FORMATTING_BUFFER) { + + // Give up now to avoid using too much memory. + LOG.error("Internal error: Exceeded maximum rename pending file size of " + + MAX_RENAME_PENDING_FILE_SIZE + " bytes."); + + // return some bad JSON with an error message to make it human readable + return "exceeded maximum rename pending file size"; + } + } + builder.append("\n ]"); + String fileList = builder.toString(); + + // Make file contents as a string. Again, quote file names, escaping + // characters as appropriate. + String contents = "{\n" + + " FormatVersion: \"1.0\",\n" + + " OperationUTCTime: \"" + time + "\",\n" + + " OldFolderName: " + quote(srcKey) + ",\n" + + " NewFolderName: " + quote(dstKey) + ",\n" + + " FileList: " + fileList + "\n" + + "}\n"; + + return contents; + } + + /** + * This is an exact copy of org.codehaus.jettison.json.JSONObject.quote + * method. + * + * Produce a string in double quotes with backslash sequences in all the + * right places. A backslash will be inserted within </, allowing JSON + * text to be delivered in HTML. In JSON text, a string cannot contain a + * control character or an unescaped quote or backslash. + * @param string A String + * @return A String correctly formatted for insertion in a JSON text. + */ + private String quote(String string) { + if (string == null || string.length() == 0) { + return "\"\""; + } + + char c = 0; + int i; + int len = string.length(); + StringBuilder sb = new StringBuilder(len + 4); + String t; + + sb.append('"'); + for (i = 0; i < len; i += 1) { + c = string.charAt(i); + switch (c) { + case '\\': + case '"': + sb.append('\\'); + sb.append(c); + break; + case '/': + sb.append('\\'); + sb.append(c); + break; + case '\b': + sb.append("\\b"); + break; + case '\t': + sb.append("\\t"); + break; + case '\n': + sb.append("\\n"); + break; + case '\f': + sb.append("\\f"); + break; + case '\r': + sb.append("\\r"); + break; + default: + if (c < ' ') { + t = "000" + Integer.toHexString(c); + sb.append("\\u" + t.substring(t.length() - 4)); + } else { + sb.append(c); + } + } + } + sb.append('"'); + return sb.toString(); + } + + public String getSrcKey() { + return srcKey; + } + + public String getDstKey() { + return dstKey; + } + + public FileMetadata getSourceMetadata() throws IOException { + return fs.getStoreInterface().retrieveMetadata(srcKey); + } + + /** + * Execute a folder rename. This is the execution path followed + * when everything is working normally. See redo() for the alternate + * execution path for the case where we're recovering from a folder rename + * failure. + * @throws IOException + */ + public void execute() throws IOException { + + for (FileMetadata file : this.getFiles()) { + + // Rename all materialized entries under the folder to point to the + // final destination. + if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { + String srcName = file.getKey(); + String suffix = srcName.substring((this.getSrcKey()).length()); + String dstName = this.getDstKey() + suffix; + + // Rename gets exclusive access (via a lease) for files + // designated for atomic rename. + // The main use case is for HBase write-ahead log (WAL) and data + // folder processing correctness. See the rename code for details. + boolean acquireLease = fs.getStoreInterface().isAtomicRenameKey(srcName); + fs.getStoreInterface().rename(srcName, dstName, acquireLease, null); + } + } + + // Rename the source folder 0-byte root file itself. + FileMetadata srcMetadata2 = this.getSourceMetadata(); + if (srcMetadata2.getBlobMaterialization() == + BlobMaterialization.Explicit) { + + // It already has a lease on it from the "prepare" phase so there's no + // need to get one now. Pass in existing lease to allow file delete. + fs.getStoreInterface().rename(this.getSrcKey(), this.getDstKey(), + false, folderLease); + } + + // Update the last-modified time of the parent folders of both source and + // destination. + fs.updateParentFolderLastModifiedTime(srcKey); + fs.updateParentFolderLastModifiedTime(dstKey); + } + + /** Clean up after execution of rename. + * @throws IOException */ + public void cleanup() throws IOException { + + if (fs.getStoreInterface().isAtomicRenameKey(srcKey)) { + + // Remove RenamePending file + fs.delete(getRenamePendingFilePath(), false); + + // Freeing source folder lease is not necessary since the source + // folder file was deleted. + } + } + + private Path getRenamePendingFilePath() { + String fileName = srcKey + SUFFIX; + Path fileNamePath = keyToPath(fileName); + Path path = fs.makeAbsolute(fileNamePath); + return path; + } + + /** + * Recover from a folder rename failure by redoing the intended work, + * as recorded in the -RenamePending.json file. + * + * @throws IOException + */ + public void redo() throws IOException { + + if (!committed) { + + // Nothing to do. The -RedoPending.json file should have already been + // deleted. + return; + } + + // Try to get a lease on source folder to block concurrent access to it. + // It may fail if the folder is already gone. We don't check if the + // source exists explicitly because that could recursively trigger redo + // and give an infinite recursion. + SelfRenewingLease lease = null; + boolean sourceFolderGone = false; + try { + lease = fs.leaseSourceFolder(srcKey); + } catch (AzureException e) { + + // If the source folder was not found then somebody probably + // raced with us and finished the rename first, or the + // first rename failed right before deleting the rename pending + // file. + String errorCode = ""; + try { + StorageException se = (StorageException) e.getCause(); + errorCode = se.getErrorCode(); + } catch (Exception e2) { + ; // do nothing -- could not get errorCode + } + if (errorCode.equals("BlobNotFound")) { + sourceFolderGone = true; + } else { + throw new IOException( + "Unexpected error when trying to lease source folder name during " + + "folder rename redo", + e); + } + } + + if (!sourceFolderGone) { + // Make sure the target folder exists. + Path dst = fullPath(dstKey); + if (!fs.exists(dst)) { + fs.mkdirs(dst); + } + + // For each file inside the folder to be renamed, + // make sure it has been renamed. + for(String fileName : fileStrings) { + finishSingleFileRename(fileName); + } + + // Remove the source folder. Don't check explicitly if it exists, + // to avoid triggering redo recursively. + try { + fs.getStoreInterface().delete(srcKey, lease); + } catch (Exception e) { + LOG.info("Unable to delete source folder during folder rename redo. " + + "If the source folder is already gone, this is not an error " + + "condition. Continuing with redo.", e); + } + + // Update the last-modified time of the parent folders of both source + // and destination. + fs.updateParentFolderLastModifiedTime(srcKey); + fs.updateParentFolderLastModifiedTime(dstKey); + } + + // Remove the -RenamePending.json file. + fs.delete(getRenamePendingFilePath(), false); + } + + // See if the source file is still there, and if it is, rename it. + private void finishSingleFileRename(String fileName) + throws IOException { + Path srcFile = fullPath(srcKey, fileName); + Path dstFile = fullPath(dstKey, fileName); + boolean srcExists = fs.exists(srcFile); + boolean dstExists = fs.exists(dstFile); + if (srcExists && !dstExists) { + + // Rename gets exclusive access (via a lease) for HBase write-ahead log + // (WAL) file processing correctness. See the rename code for details. + String srcName = fs.pathToKey(srcFile); + String dstName = fs.pathToKey(dstFile); + fs.getStoreInterface().rename(srcName, dstName, true, null); + } else if (srcExists && dstExists) { + + // Get a lease on source to block write access. + String srcName = fs.pathToKey(srcFile); + SelfRenewingLease lease = fs.acquireLease(srcFile); + + // Delete the file. This will free the lease too. + fs.getStoreInterface().delete(srcName, lease); + } else if (!srcExists && dstExists) { + + // The rename already finished, so do nothing. + ; + } else { + throw new IOException( + "Attempting to complete rename of file " + srcKey + "/" + fileName + + " during folder rename redo, and file was not found in source " + + "or destination."); + } + } + + // Return an absolute path for the specific fileName within the folder + // specified by folderKey. + private Path fullPath(String folderKey, String fileName) { + return new Path(new Path(fs.getUri()), "/" + folderKey + "/" + fileName); + } + + private Path fullPath(String fileKey) { + return new Path(new Path(fs.getUri()), "/" + fileKey); + } + } + + private static final String TRAILING_PERIOD_PLACEHOLDER = "[[.]]"; + private static final Pattern TRAILING_PERIOD_PLACEHOLDER_PATTERN = + Pattern.compile("\\[\\[\\.\\]\\](?=$|/)"); + private static final Pattern TRAILING_PERIOD_PATTERN = Pattern.compile("\\.(?=$|/)"); @Override public String getScheme() { @@ -121,17 +632,53 @@ public class NativeAzureFileSystem extends FileSystem { */ static final String AZURE_DEFAULT_GROUP_DEFAULT = "supergroup"; - static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = "fs.azure.block.location.impersonatedhost"; - private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; + static final String AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME = + "fs.azure.block.location.impersonatedhost"; + private static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = + "localhost"; + static final String AZURE_RINGBUFFER_CAPACITY_PROPERTY_NAME = + "fs.azure.ring.buffer.capacity"; + static final String AZURE_OUTPUT_STREAM_BUFFER_SIZE_PROPERTY_NAME = + "fs.azure.output.stream.buffer.size"; private class NativeAzureFsInputStream extends FSInputStream { private InputStream in; private final String key; private long pos = 0; + private boolean closed = false; + private boolean isPageBlob; - public NativeAzureFsInputStream(DataInputStream in, String key) { + // File length, valid only for streams over block blobs. + private long fileLength; + + public NativeAzureFsInputStream(DataInputStream in, String key, long fileLength) { this.in = in; this.key = key; + this.isPageBlob = store.isPageBlobKey(key); + this.fileLength = fileLength; + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (isPageBlob) { + return in.available(); + } else { + if (closed) { + throw new IOException("Stream closed"); + } + final long remaining = this.fileLength - pos; + return remaining <= Integer.MAX_VALUE ? + (int) remaining : Integer.MAX_VALUE; + } } /* @@ -140,7 +687,7 @@ public class NativeAzureFileSystem extends FileSystem { * because the end of the stream has been reached, the value -1 is returned. * This method blocks until input data is available, the end of the stream * is detected, or an exception is thrown. - * + * * @returns int An integer corresponding to the byte read. */ @Override @@ -169,13 +716,13 @@ public class NativeAzureFileSystem extends FileSystem { * one byte. If no byte is available because the stream is at end of file, * the value -1 is returned; otherwise, at least one byte is read and stored * into b. - * + * * @param b -- the buffer into which data is read - * + * * @param off -- the start offset in the array b at which data is written - * + * * @param len -- the maximum number of bytes read - * + * * @ returns int The total number of byes read into the buffer, or -1 if * there is no more data because the end of stream is reached. */ @@ -196,15 +743,20 @@ public class NativeAzureFileSystem extends FileSystem { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { in.close(); + closed = true; } @Override public synchronized void seek(long pos) throws IOException { - in.close(); - in = store.retrieve(key, pos); - this.pos = pos; + in.close(); + in = store.retrieve(key); + this.pos = in.skip(pos); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Seek to position %d. Bytes skipped %d", pos, + this.pos)); + } } @Override @@ -468,7 +1020,8 @@ public class NativeAzureFileSystem extends FileSystem { } @Override - public void initialize(URI uri, Configuration conf) throws IOException { + public void initialize(URI uri, Configuration conf) + throws IOException, IllegalArgumentException { // Check authority for the URI to guarantee that it is non-null. uri = reconstructAuthorityIfNeeded(uri, conf); if (null == uri.getAuthority()) { @@ -514,10 +1067,29 @@ public class NativeAzureFileSystem extends FileSystem { return actualStore; } - // Note: The logic for this method is confusing as to whether it strips the - // last slash or not (it adds it in the beginning, then strips it at the end). - // We should revisit that. - private String pathToKey(Path path) { + /** + * Azure Storage doesn't allow the blob names to end in a period, + * so encode this here to work around that limitation. + */ + private static String encodeTrailingPeriod(String toEncode) { + Matcher matcher = TRAILING_PERIOD_PATTERN.matcher(toEncode); + return matcher.replaceAll(TRAILING_PERIOD_PLACEHOLDER); + } + + /** + * Reverse the encoding done by encodeTrailingPeriod(). + */ + private static String decodeTrailingPeriod(String toDecode) { + Matcher matcher = TRAILING_PERIOD_PLACEHOLDER_PATTERN.matcher(toDecode); + return matcher.replaceAll("."); + } + + /** + * Convert the path to a key. By convention, any leading or trailing slash is + * removed, except for the special case of a single slash. + */ + @VisibleForTesting + public String pathToKey(Path path) { // Convert the path to a URI to parse the scheme, the authority, and the // path from the path object. URI tmpUri = path.toUri(); @@ -537,6 +1109,8 @@ public class NativeAzureFileSystem extends FileSystem { String key = null; key = newPath.toUri().getPath(); + key = removeTrailingSlash(key); + key = encodeTrailingPeriod(key); if (key.length() == 1) { return key; } else { @@ -544,14 +1118,34 @@ public class NativeAzureFileSystem extends FileSystem { } } + // Remove any trailing slash except for the case of a single slash. + private static String removeTrailingSlash(String key) { + if (key.length() == 0 || key.length() == 1) { + return key; + } + if (key.charAt(key.length() - 1) == '/') { + return key.substring(0, key.length() - 1); + } else { + return key; + } + } + private static Path keyToPath(String key) { if (key.equals("/")) { return new Path("/"); // container } - return new Path("/" + key); + return new Path("/" + decodeTrailingPeriod(key)); } - private Path makeAbsolute(Path path) { + /** + * Get the absolute version of the path (fully qualified). + * This is public for testing purposes. + * + * @param path + * @return fully qualified path + */ + @VisibleForTesting + public Path makeAbsolute(Path path) { if (path.isAbsolute()) { return path; } @@ -569,6 +1163,10 @@ public class NativeAzureFileSystem extends FileSystem { return actualStore; } + NativeFileSystemStore getStoreInterface() { + return store; + } + /** * Gets the metrics source for this file system. * This is mainly here for unit testing purposes. @@ -590,6 +1188,145 @@ public class NativeAzureFileSystem extends FileSystem { public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { + return create(f, permission, overwrite, true, + bufferSize, replication, blockSize, progress, + (SelfRenewingLease) null); + } + + /** + * Get a self-renewing lease on the specified file. + */ + public SelfRenewingLease acquireLease(Path path) throws AzureException { + String fullKey = pathToKey(makeAbsolute(path)); + return getStore().acquireLease(fullKey); + } + + @Override + @SuppressWarnings("deprecation") + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + + Path parent = f.getParent(); + + // Get exclusive access to folder if this is a directory designated + // for atomic rename. The primary use case of for HBase write-ahead + // log file management. + SelfRenewingLease lease = null; + if (store.isAtomicRenameKey(pathToKey(f))) { + try { + lease = acquireLease(parent); + } catch (AzureException e) { + + String errorCode = ""; + try { + StorageException e2 = (StorageException) e.getCause(); + errorCode = e2.getErrorCode(); + } catch (Exception e3) { + // do nothing if cast fails + } + if (errorCode.equals("BlobNotFound")) { + throw new FileNotFoundException("Cannot create file " + + f.getName() + " because parent folder does not exist."); + } + + LOG.warn("Got unexpected exception trying to get lease on " + + pathToKey(parent) + ". " + e.getMessage()); + throw e; + } + } + + // See if the parent folder exists. If not, throw error. + // The exists() check will push any pending rename operation forward, + // if there is one, and return false. + // + // At this point, we have exclusive access to the source folder + // via the lease, so we will not conflict with an active folder + // rename operation. + if (!exists(parent)) { + try { + + // This'll let the keep-alive thread exit as soon as it wakes up. + lease.free(); + } catch (Exception e) { + LOG.warn("Unable to free lease because: " + e.getMessage()); + } + throw new FileNotFoundException("Cannot create file " + + f.getName() + " because parent folder does not exist."); + } + + // Create file inside folder. + FSDataOutputStream out = null; + try { + out = create(f, permission, overwrite, false, + bufferSize, replication, blockSize, progress, lease); + } finally { + // Release exclusive access to folder. + try { + if (lease != null) { + lease.free(); + } + } catch (Exception e) { + IOUtils.cleanup(LOG, out); + String msg = "Unable to free lease on " + parent.toUri(); + LOG.error(msg); + throw new IOException(msg, e); + } + } + return out; + } + + @Override + @SuppressWarnings("deprecation") + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + + // Check if file should be appended or overwritten. Assume that the file + // is overwritten on if the CREATE and OVERWRITE create flags are set. Note + // that any other combinations of create flags will result in an open new or + // open with append. + final EnumSet<CreateFlag> createflags = + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + boolean overwrite = flags.containsAll(createflags); + + // Delegate the create non-recursive call. + return this.createNonRecursive(f, permission, overwrite, + bufferSize, replication, blockSize, progress); + } + + @Override + @SuppressWarnings("deprecation") + public FSDataOutputStream createNonRecursive(Path f, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return this.createNonRecursive(f, FsPermission.getFileDefault(), + overwrite, bufferSize, replication, blockSize, progress); + } + + + /** + * Create an Azure blob and return an output stream to use + * to write data to it. + * + * @param f + * @param permission + * @param overwrite + * @param createParent + * @param bufferSize + * @param replication + * @param blockSize + * @param progress + * @param parentFolderLease Lease on parent folder (or null if + * no lease). + * @return + * @throws IOException + */ + private FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, boolean createParent, int bufferSize, + short replication, long blockSize, Progressable progress, + SelfRenewingLease parentFolderLease) + throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Creating file: " + f.toString()); @@ -620,45 +1357,60 @@ public class NativeAzureFileSystem extends FileSystem { // already exists. String parentKey = pathToKey(parentFolder); FileMetadata parentMetadata = store.retrieveMetadata(parentKey); - if (parentMetadata != null - && parentMetadata.isDir() - && parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { - store.updateFolderLastModifiedTime(parentKey); + if (parentMetadata != null && parentMetadata.isDir() && + parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { + store.updateFolderLastModifiedTime(parentKey, parentFolderLease); } else { // Make sure that the parent folder exists. - mkdirs(parentFolder, permission); + // Create it using inherited permissions from the first existing directory going up the path + Path firstExisting = parentFolder.getParent(); + FileMetadata metadata = store.retrieveMetadata(pathToKey(firstExisting)); + while(metadata == null) { + // Guaranteed to terminate properly because we will eventually hit root, which will return non-null metadata + firstExisting = firstExisting.getParent(); + metadata = store.retrieveMetadata(pathToKey(firstExisting)); + } + mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true); } } - // Open the output blob stream based on the encoded key. - String keyEncoded = encodeKey(key); - // Mask the permission first (with the default permission mask as well). FsPermission masked = applyUMask(permission, UMaskApplyMode.NewFile); PermissionStatus permissionStatus = createPermissionStatus(masked); - // First create a blob at the real key, pointing back to the temporary file - // This accomplishes a few things: - // 1. Makes sure we can create a file there. - // 2. Makes it visible to other concurrent threads/processes/nodes what - // we're - // doing. - // 3. Makes it easier to restore/cleanup data in the event of us crashing. - store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); - - // The key is encoded to point to a common container at the storage server. - // This reduces the number of splits on the server side when load balancing. - // Ingress to Azure storage can take advantage of earlier splits. We remove - // the root path to the key and prefix a random GUID to the tail (or leaf - // filename) of the key. Keys are thus broadly and randomly distributed over - // a single container to ease load balancing on the storage server. When the - // blob is committed it is renamed to its earlier key. Uncommitted blocks - // are not cleaned up and we leave it to Azure storage to garbage collect - // these - // blocks. - OutputStream bufOutStream = new NativeAzureFsOutputStream(store.storefile( - keyEncoded, permissionStatus), key, keyEncoded); - + OutputStream bufOutStream; + if (store.isPageBlobKey(key)) { + // Store page blobs directly in-place without renames. + bufOutStream = store.storefile(key, permissionStatus); + } else { + // This is a block blob, so open the output blob stream based on the + // encoded key. + // + String keyEncoded = encodeKey(key); + + + // First create a blob at the real key, pointing back to the temporary file + // This accomplishes a few things: + // 1. Makes sure we can create a file there. + // 2. Makes it visible to other concurrent threads/processes/nodes what + // we're + // doing. + // 3. Makes it easier to restore/cleanup data in the event of us crashing. + store.storeEmptyLinkFile(key, keyEncoded, permissionStatus); + + // The key is encoded to point to a common container at the storage server. + // This reduces the number of splits on the server side when load balancing. + // Ingress to Azure storage can take advantage of earlier splits. We remove + // the root path to the key and prefix a random GUID to the tail (or leaf + // filename) of the key. Keys are thus broadly and randomly distributed over + // a single container to ease load balancing on the storage server. When the + // blob is committed it is renamed to its earlier key. Uncommitted blocks + // are not cleaned up and we leave it to Azure storage to garbage collect + // these + // blocks. + bufOutStream = new NativeAzureFsOutputStream(store.storefile( + keyEncoded, permissionStatus), key, keyEncoded); + } // Construct the data output stream from the buffered output stream. FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); @@ -678,6 +1430,28 @@ public class NativeAzureFileSystem extends FileSystem { @Override public boolean delete(Path f, boolean recursive) throws IOException { + return delete(f, recursive, false); + } + + /** + * Delete the specified file or folder. The parameter + * skipParentFolderLastModifidedTimeUpdate + * is used in the case of atomic folder rename redo. In that case, there is + * a lease on the parent folder, so (without reworking the code) modifying + * the parent folder update time will fail because of a conflict with the + * lease. Since we are going to delete the folder soon anyway so accurate + * modified time is not necessary, it's easier to just skip + * the modified time update. + * + * @param f + * @param recursive + * @param skipParentFolderLastModifidedTimeUpdate If true, don't update the folder last + * modified time. + * @return true if and only if the file is deleted + * @throws IOException + */ + public boolean delete(Path f, boolean recursive, + boolean skipParentFolderLastModifidedTimeUpdate) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Deleting file: " + f.toString()); @@ -723,11 +1497,13 @@ public class NativeAzureFileSystem extends FileSystem { store.storeEmptyFolder(parentKey, createPermissionStatus(FsPermission.getDefault())); } else { - store.updateFolderLastModifiedTime(parentKey); + if (!skipParentFolderLastModifidedTimeUpdate) { + store.updateFolderLastModifiedTime(parentKey, null); + } } } - instrumentation.fileDeleted(); store.delete(key); + instrumentation.fileDeleted(); } else { // The path specifies a folder. Recursively delete all entries under the // folder. @@ -784,7 +1560,9 @@ public class NativeAzureFileSystem extends FileSystem { Path parent = absolutePath.getParent(); if (parent != null && parent.getParent() != null) { // not root String parentKey = pathToKey(parent); - store.updateFolderLastModifiedTime(parentKey); + if (!skipParentFolderLastModifidedTimeUpdate) { + store.updateFolderLastModifiedTime(parentKey, null); + } } instrumentation.directoryDeleted(); } @@ -818,6 +1596,13 @@ public class NativeAzureFileSystem extends FileSystem { LOG.debug("Path " + f.toString() + "is a folder."); } + // If a rename operation for the folder was pending, redo it. + // Then the file does not exist, so signal that. + if (conditionalRedoFolderRename(f)) { + throw new FileNotFoundException( + absolutePath + ": No such file or directory."); + } + // Return reference to the directory object. return newDirectory(meta, absolutePath); } @@ -832,9 +1617,38 @@ public class NativeAzureFileSystem extends FileSystem { } // File not found. Throw exception no such file or directory. - // Note: Should never get to this point since the root always exists. - throw new FileNotFoundException(absolutePath - + ": No such file or directory."); + // + throw new FileNotFoundException( + absolutePath + ": No such file or directory."); + } + + // Return true if there is a rename pending and we redo it, otherwise false. + private boolean conditionalRedoFolderRename(Path f) throws IOException { + + // Can't rename /, so return immediately in that case. + if (f.getName().equals("")) { + return false; + } + + // Check if there is a -RenamePending.json file for this folder, and if so, + // redo the rename. + Path absoluteRenamePendingFile = renamePendingFilePath(f); + if (exists(absoluteRenamePendingFile)) { + FolderRenamePending pending = + new FolderRenamePending(absoluteRenamePendingFile, this); + pending.redo(); + return true; + } else { + return false; + } + } + + // Return the path name that would be used for rename of folder with path f. + private Path renamePendingFilePath(Path f) { + Path absPath = makeAbsolute(f); + String key = pathToKey(absPath); + key += "-RenamePending.json"; + return keyToPath(key); } @Override @@ -867,6 +1681,17 @@ public class NativeAzureFileSystem extends FileSystem { } String partialKey = null; PartialListing listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); + + // For any -RenamePending.json files in the listing, + // push the rename forward. + boolean renamed = conditionalRedoFolderRenames(listing); + + // If any renames were redone, get another listing, + // since the current one may have changed due to the redo. + if (renamed) { + listing = store.list(key, AZURE_LIST_ALL, 1, partialKey); + } + for (FileMetadata fileMetadata : listing.getFiles()) { Path subpath = keyToPath(fileMetadata.getKey()); @@ -903,25 +1728,62 @@ public class NativeAzureFileSystem extends FileSystem { return status.toArray(new FileStatus[0]); } + // Redo any folder renames needed if there are rename pending files in the + // directory listing. Return true if one or more redo operations were done. + private boolean conditionalRedoFolderRenames(PartialListing listing) + throws IllegalArgumentException, IOException { + boolean renamed = false; + for (FileMetadata fileMetadata : listing.getFiles()) { + Path subpath = keyToPath(fileMetadata.getKey()); + if (isRenamePendingFile(subpath)) { + FolderRenamePending pending = + new FolderRenamePending(subpath, this); + pending.redo(); + renamed = true; + } + } + return renamed; + } + + // True if this is a folder rename pending file, else false. + private boolean isRenamePendingFile(Path path) { + return path.toString().endsWith(FolderRenamePending.SUFFIX); + } + private FileStatus newFile(FileMetadata meta, Path path) { - return new FileStatus(meta.getLength(), false, 1, blockSize, - meta.getLastModified(), 0, meta.getPermissionStatus().getPermission(), - meta.getPermissionStatus().getUserName(), meta.getPermissionStatus() - .getGroupName(), + return new FileStatus ( + meta.getLength(), + false, + 1, + blockSize, + meta.getLastModified(), + 0, + meta.getPermissionStatus().getPermission(), + meta.getPermissionStatus().getUserName(), + meta.getPermissionStatus().getGroupName(), path.makeQualified(getUri(), getWorkingDirectory())); } private FileStatus newDirectory(FileMetadata meta, Path path) { - return new FileStatus(0, true, 1, blockSize, meta == null ? 0 - : meta.getLastModified(), 0, meta == null ? FsPermission.getDefault() - : meta.getPermissionStatus().getPermission(), meta == null ? "" : meta - .getPermissionStatus().getUserName(), meta == null ? "" : meta - .getPermissionStatus().getGroupName(), path.makeQualified(getUri(), - getWorkingDirectory())); + return new FileStatus ( + 0, + true, + 1, + blockSize, + meta == null ? 0 : meta.getLastModified(), + 0, + meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(), + meta == null ? "" : meta.getPermissionStatus().getUserName(), + meta == null ? "" : meta.getPermissionStatus().getGroupName(), + path.makeQualified(getUri(), getWorkingDirectory())); } private static enum UMaskApplyMode { - NewFile, NewDirectory, ChangeExistingFile, ChangeExistingDirectory, + NewFile, + NewDirectory, + NewDirectoryNoUmask, + ChangeExistingFile, + ChangeExistingDirectory, } /** @@ -958,13 +1820,19 @@ public class NativeAzureFileSystem extends FileSystem { private PermissionStatus createPermissionStatus(FsPermission permission) throws IOException { // Create the permission status for this file based on current user - return new PermissionStatus(UserGroupInformation.getCurrentUser() - .getShortUserName(), getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, - AZURE_DEFAULT_GROUP_DEFAULT), permission); + return new PermissionStatus( + UserGroupInformation.getCurrentUser().getShortUserName(), + getConf().get(AZURE_DEFAULT_GROUP_PROPERTY_NAME, + AZURE_DEFAULT_GROUP_DEFAULT), + permission); } @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return mkdirs(f, permission, false); + } + + public boolean mkdirs(Path f, FsPermission permission, boolean noUmask) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Creating directory: " + f.toString()); } @@ -975,24 +1843,31 @@ public class NativeAzureFileSystem extends FileSystem { } Path absolutePath = makeAbsolute(f); - PermissionStatus permissionStatus = createPermissionStatus(applyUMask( - permission, UMaskApplyMode.NewDirectory)); + PermissionStatus permissionStatus = null; + if(noUmask) { + // ensure owner still has wx permissions at the minimum + permissionStatus = createPermissionStatus( + applyUMask(FsPermission.createImmutable((short) (permission.toShort() | USER_WX_PERMISION)), + UMaskApplyMode.NewDirectoryNoUmask)); + } else { + permissionStatus = createPermissionStatus( + applyUMask(permission, UMaskApplyMode.NewDirectory)); + } + ArrayList<String> keysToCreateAsFolder = new ArrayList<String>(); ArrayList<String> keysToUpdateAsFolder = new ArrayList<String>(); boolean childCreated = false; // Check that there is no file in the parent chain of the given path. - // Stop when you get to the root - for (Path current = absolutePath, parent = current.getParent(); parent != null; current = parent, parent = current - .getParent()) { + for (Path current = absolutePath, parent = current.getParent(); + parent != null; // Stop when you get to the root + current = parent, parent = current.getParent()) { String currentKey = pathToKey(current); FileMetadata currentMetadata = store.retrieveMetadata(currentKey); if (currentMetadata != null && !currentMetadata.isDir()) { - throw new IOException("Cannot create directory " + f + " because " - + current + " is an existing file."); - } else if (currentMetadata == null - || (currentMetadata.isDir() && currentMetadata - .getBlobMaterialization() == BlobMaterialization.Implicit)) { + throw new IOException("Cannot create directory " + f + " because " + + current + " is an existing file."); + } else if (currentMetadata == null) { keysToCreateAsFolder.add(currentKey); childCreated = true; } else { @@ -1009,18 +1884,8 @@ public class NativeAzureFileSystem extends FileSystem { store.storeEmptyFolder(currentKey, permissionStatus); } - // Take the time after finishing mkdirs as the modified time, and update all - // the existing directories' modified time to it uniformly. - final Calendar lastModifiedCalendar = Calendar - .getInstance(Utility.LOCALE_US); - lastModifiedCalendar.setTimeZone(Utility.UTC_ZONE); - Date lastModified = lastModifiedCalendar.getTime(); - for (String key : keysToUpdateAsFolder) { - store.updateFolderLastModifiedTime(key, lastModified); - } - instrumentation.directoryCreated(); - + // otherwise throws exception return true; } @@ -1043,12 +1908,14 @@ public class NativeAzureFileSystem extends FileSystem { } return new FSDataInputStream(new BufferedFSInputStream( - new NativeAzureFsInputStream(store.retrieve(key), key), bufferSize)); + new NativeAzureFsInputStream(store.retrieve(key), key, meta.getLength()), bufferSize)); } @Override public boolean rename(Path src, Path dst) throws IOException { + FolderRenamePending renamePending = null; + if (LOG.isDebugEnabled()) { LOG.debug("Moving " + src + " to " + dst); } @@ -1065,91 +1932,28 @@ public class NativeAzureFileSystem extends FileSystem { return false; } - FileMetadata srcMetadata = store.retrieveMetadata(srcKey); - if (srcMetadata == null) { - // Source doesn't exist - if (LOG.isDebugEnabled()) { - LOG.debug("Source " + src + " doesn't exist, failing the rename."); - } - return false; - } - // Figure out the final destination Path absoluteDst = makeAbsolute(dst); String dstKey = pathToKey(absoluteDst); FileMetadata dstMetadata = store.retrieveMetadata(dstKey); - - // directory rename validations - if (srcMetadata.isDir()) { - - // rename dir to self is an error - if (srcKey.equals(dstKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming directory to itself is disallowed. path=" + src); - } - return false; - } - - // rename dir to (sub-)child of self is an error. see - // FileSystemContractBaseTest.testRenameChildDirForbidden - if (dstKey.startsWith(srcKey + PATH_DELIMITER)) { - - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming directory to itself is disallowed. src=" + src - + " dest=" + dst); - } - return false; - } - } - - // file rename early checks - if (!srcMetadata.isDir()) { - if (srcKey.equals(dstKey)) { - // rename file to self is OK - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming file to itself. This is allowed and is treated as no-op. path=" - + src); - } - return true; - } - } - - // More validations.. - // If target is dir but target already exists, alter the dst to be a - // subfolder. - // eg move("/a/file.txt", "/b") where "/b" already exists causes the target - // to be "/c/file.txt if (dstMetadata != null && dstMetadata.isDir()) { + // It's an existing directory. dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName()))); - // Best would be to update dstMetadata, but it is not used further, so set - // it to null and skip the additional cost - dstMetadata = null; - // dstMetadata = store.retrieveMetadata(dstKey); if (LOG.isDebugEnabled()) { LOG.debug("Destination " + dst + " is a directory, adjusted the destination to be " + dstKey); } - - // rename dir to self is an error - if (srcKey.equals(dstKey)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming directory to itself is disallowed. path=" + src); - } - return false; - } - } else if (dstMetadata != null) { - // Otherwise, attempting to overwrite a file is error + // Attempting to overwrite a file using rename() if (LOG.isDebugEnabled()) { LOG.debug("Destination " + dst + " is an already existing file, failing the rename."); } return false; } else { - // Either dir or file and target doesn't exist.. Check that the parent - // directory exists. - FileMetadata parentOfDestMetadata = store - .retrieveMetadata(pathToKey(absoluteDst.getParent())); + // Check that the parent directory exists. + FileMetadata parentOfDestMetadata = + store.retrieveMetadata(pathToKey(absoluteDst.getParent())); if (parentOfDestMetadata == null) { if (LOG.isDebugEnabled()) { LOG.debug("Parent of the destination " + dst @@ -1164,88 +1968,136 @@ public class NativeAzureFileSystem extends FileSystem { return false; } } - - // Validations complete, do the move. - if (!srcMetadata.isDir()) { + FileMetadata srcMetadata = store.retrieveMetadata(srcKey); + if (srcMetadata == null) { + // Source doesn't exist + if (LOG.isDebugEnabled()) { + LOG.debug("Source " + src + " doesn't exist, failing the rename."); + } + return false; + } else if (!srcMetadata.isDir()) { if (LOG.isDebugEnabled()) { LOG.debug("Source " + src + " found as a file, renaming."); } store.rename(srcKey, dstKey); } else { - // Move everything inside the folder. - String priorLastKey = null; - // Calculate the index of the part of the string to be moved. That - // is everything on the path up to the folder name. - do { - // List all blobs rooted at the source folder. - PartialListing listing = store.listAll(srcKey, AZURE_LIST_ALL, - AZURE_UNBOUNDED_DEPTH, priorLastKey); - - // Rename all the files in the folder. - for (FileMetadata file : listing.getFiles()) { - // Rename all materialized entries under the folder to point to the - // final destination. - if (file.getBlobMaterialization() == BlobMaterialization.Explicit) { - String srcName = file.getKey(); - String suffix = srcName.substring(srcKey.length()); - String dstName = dstKey + suffix; - store.rename(srcName, dstName); - } - } - priorLastKey = listing.getPriorLastKey(); - } while (priorLastKey != null); - // Rename the top level empty blob for the folder. - if (srcMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) { - store.rename(srcKey, dstKey); + // Prepare for, execute and clean up after of all files in folder, and + // the root file, and update the last modified time of the source and + // target parent folders. The operation can be redone if it fails part + // way through, by applying the "Rename Pending" file. + + // The following code (internally) only does atomic rename preparation + // and lease management for page blob folders, limiting the scope of the + // operation to HBase log file folders, where atomic rename is required. + // In the future, we could generalize it easily to all folders. + renamePending = prepareAtomicFolderRename(srcKey, dstKey); + renamePending.execute(); + if (LOG.isDebugEnabled()) { + LOG.debug("Renamed " + src + " to " + dst + " successfully."); } + renamePending.cleanup(); + return true; } - // Update both source and destination parent folder last modified time. - Path srcParent = makeAbsolute(keyToPath(srcKey)).getParent(); - if (srcParent != null && srcParent.getParent() != null) { // not root - String srcParentKey = pathToKey(srcParent); + // Update the last-modified time of the parent folders of both source + // and destination. + updateParentFolderLastModifiedTime(srcKey); + updateParentFolderLastModifiedTime(dstKey); - // ensure the srcParent is a materialized folder - FileMetadata srcParentMetadata = store.retrieveMetadata(srcParentKey); - if (srcParentMetadata.isDir() - && srcParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { - store.storeEmptyFolder(srcParentKey, - createPermissionStatus(FsPermission.getDefault())); - } - - store.updateFolderLastModifiedTime(srcParentKey); + if (LOG.isDebugEnabled()) { + LOG.debug("Renamed " + src + " to " + dst + " successfully."); } + return true; + } - Path destParent = makeAbsolute(keyToPath(dstKey)).getParent(); - if (destParent != null && destParent.getParent() != null) { // not root - String dstParentKey = pathToKey(destParent); + /** + * Update the last-modified time of the parent folder of the file + * identified by key. + * @param key + * @throws IOException + */ + private void updateParentFolderLastModifiedTime(String key) + throws IOException { + Path parent = makeAbsolute(keyToPath(key)).getParent(); + if (parent != null && parent.getParent() != null) { // not root + String parentKey = pathToKey(parent); - // ensure the dstParent is a materialized folder - FileMetadata dstParentMetadata = store.retrieveMetadata(dstParentKey); - if (dstParentMetadata.isDir() - && dstParentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { - store.storeEmptyFolder(dstParentKey, - createPermissionStatus(FsPermission.getDefault())); - } + // ensure the parent is a materialized folder + FileMetadata parentMetadata = store.retrieveMetadata(parentKey); + // The metadata could be null if the implicit folder only contains a + // single file. In this case, the parent folder no longer exists if the + // file is renamed; so we can safely ignore the null pointer case. + if (parentMetadata != null) { + if (parentMetadata.isDir() + && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) { + store.storeEmptyFolder(parentKey, + createPermissionStatus(FsPermission.getDefault())); + } - store.updateFolderLastModifiedTime(dstParentKey); + store.updateFolderLastModifiedTime(parentKey, null); + } } + } - if (LOG.isDebugEnabled()) { - LOG.debug("Renamed " + src + " to " + dst + " successfully."); + /** + * If the source is a page blob folder, + * prepare to rename this folder atomically. This means to get exclusive + * access to the source folder, and record the actions to be performed for + * this rename in a "Rename Pending" file. This code was designed to + * meet the needs of HBase, which requires atomic rename of write-ahead log + * (WAL) folders for correctness. + * + * Before calling this method, the caller must ensure that the source is a + * folder. + * + * For non-page-blob directories, prepare the in-memory information needed, + * but don't take the lease or write the redo file. This is done to limit the + * scope of atomic folder rename to HBase, at least at the time of writing + * this code. + * + * @param srcKey Source folder name. + * @param dstKey Destination folder name. + * @throws IOException + */ + private FolderRenamePending prepareAtomicFolderRename( + String srcKey, String dstKey) throws IOException { + + if (store.isAtomicRenameKey(srcKey)) { + + // Block unwanted concurrent access to source folder. + SelfRenewingLease lease = leaseSourceFolder(srcKey); + + // Prepare in-memory information needed to do or redo a folder rename. + FolderRenamePending renamePending = + new FolderRenamePending(srcKey, dstKey, lease, this); + + // Save it to persistent storage to help recover if the operation fails. + renamePending.writeFile(this); + return renamePending; + } else { + FolderRenamePending renamePending = + new FolderRenamePending(srcKey, dstKey, null, this); + return renamePending; } - return true; } /** - * Return an array containing hostnames, offset and size of portions of the - * given file. For WASB we'll just lie and give fake hosts to make sure we get - * many splits in MR jobs. + * Get a self-renewing Azure blob lease on the source folder zero-byte file. + */ + private SelfRenewingLease leaseSourceFolder(String srcKey) + throws AzureException { + return store.acquireLease(srcKey); + } + + /** + * Return an array containing hostnames, offset and size of + * portions of the given file. For WASB we'll just lie and give + * fake hosts to make sure we get many splits in MR jobs. */ @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, long start, - long len) throws IOException { + public BlockLocation[] getFileBlockLocations(FileStatus file, + long start, long len) throws IOException { if (file == null) { return null; } @@ -1306,11 +2158,12 @@ public class NativeAzureFileSystem extends FileSystem { if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { // It's an implicit folder, need to materialize it. store.storeEmptyFolder(key, createPermissionStatus(permission)); - } else if (!metadata.getPermissionStatus().getPermission() - .equals(permission)) { - store.changePermissionStatus(key, new PermissionStatus(metadata - .getPermissionStatus().getUserName(), metadata.getPermissionStatus() - .getGroupName(), permission)); + } else if (!metadata.getPermissionStatus().getPermission(). + equals(permission)) { + store.changePermissionStatus(key, new PermissionStatus( + metadata.getPermissionStatus().getUserName(), + metadata.getPermissionStatus().getGroupName(), + permission)); } } @@ -1324,10 +2177,11 @@ public class NativeAzureFileSystem extends FileSystem { throw new FileNotFoundException("File doesn't exist: " + p); } PermissionStatus newPermissionStatus = new PermissionStatus( - username == null ? metadata.getPermissionStatus().getUserName() - : username, groupname == null ? metadata.getPermissionStatus() - .getGroupName() : groupname, metadata.getPermissionStatus() - .getPermission()); + username == null ? + metadata.getPermissionStatus().getUserName() : username, + groupname == null ? + metadata.getPermissionStatus().getGroupName() : groupname, + metadata.getPermissionStatus().getPermission()); if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) { // It's an implicit folder, need to materialize it. store.storeEmptyFolder(key, newPermissionStatus); @@ -1341,12 +2195,12 @@ public class NativeAzureFileSystem extends FileSystem { if (isClosed) { return; } - + // Call the base close() to close any resources there. super.close(); - // Close the store + // Close the store to close any resources there - e.g. the bandwidth + // updater thread would be stopped at this time. store.close(); - // Notify the metrics system that this file system is closed, which may // trigger one final metrics push to get the accurate final file system // metrics out. @@ -1364,16 +2218,17 @@ public class NativeAzureFileSystem extends FileSystem { } /** - * A handler that defines what to do with blobs whose upload was interrupted. + * A handler that defines what to do with blobs whose upload was + * interrupted. */ private abstract class DanglingFileHandler { abstract void handleFile(FileMetadata file, FileMetadata tempFile) - throws IOException; + throws IOException; } /** - * Handler implementation for just deleting dangling files and cleaning them - * up. + * Handler implementation for just deleting dangling files and cleaning + * them up. */ private class DanglingFileDeleter extends DanglingFileHandler { @Override @@ -1388,8 +2243,8 @@ public class NativeAzureFileSystem extends FileSystem { } /** - * Handler implementation for just moving dangling files to recovery location - * (/lost+found). + * Handler implementation for just moving dangling files to recovery + * location (/lost+found). */ private class DanglingFileRecoverer extends DanglingFileHandler { private final Path destination; @@ -1405,8 +2260,8 @@ public class NativeAzureFileSystem extends FileSystem { LOG.debug("Recovering " + file.getKey()); } // Move to the final destination - String finalDestinationKey = pathToKey(new Path(destination, - file.getKey())); + String finalDestinationKey = + pathToKey(new Path(destination, file.getKey())); store.rename(tempFile.getKey(), finalDestinationKey); if (!finalDestinationKey.equals(file.getKey())) { // Delete the empty link file now that we've restored it.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 4e1d0b6..0229cb7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -53,6 +53,10 @@ interface NativeFileSystemStore { DataOutputStream storefile(String key, PermissionStatus permissionStatus) throws AzureException; + boolean isPageBlobKey(String key); + + boolean isAtomicRenameKey(String key); + void storeEmptyLinkFile(String key, String tempBlobKey, PermissionStatus permissionStatus) throws AzureException; @@ -74,9 +78,12 @@ interface NativeFileSystemStore { void rename(String srcKey, String dstKey) throws IOException; + void rename(String srcKey, String dstKey, boolean acquireLease, SelfRenewingLease existingLease) + throws IOException; + /** * Delete all keys with the given prefix. Used for testing. - * + * * @throws IOException */ @VisibleForTesting @@ -84,15 +91,20 @@ interface NativeFileSystemStore { /** * Diagnostic method to dump state to the console. - * + * * @throws IOException */ void dump() throws IOException; void close(); - void updateFolderLastModifiedTime(String key) throws AzureException; - - void updateFolderLastModifiedTime(String key, Date lastModified) + void updateFolderLastModifiedTime(String key, SelfRenewingLease folderLease) throws AzureException; + + void updateFolderLastModifiedTime(String key, Date lastModified, + SelfRenewingLease folderLease) throws AzureException; + + void delete(String key, SelfRenewingLease lease) throws IOException; + + SelfRenewingLease acquireLease(String key) throws AzureException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java new file mode 100644 index 0000000..ad11aac --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobFormatHelpers.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.nio.ByteBuffer; + +import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; + +/** + * Constants and helper methods for ASV's custom data format in page blobs. + */ +final class PageBlobFormatHelpers { + public static final short PAGE_SIZE = 512; + public static final short PAGE_HEADER_SIZE = 2; + public static final short PAGE_DATA_SIZE = PAGE_SIZE - PAGE_HEADER_SIZE; + + // Hide constructor for utility class. + private PageBlobFormatHelpers() { + + } + + /** + * Stores the given short as a two-byte array. + */ + public static byte[] fromShort(short s) { + return ByteBuffer.allocate(2).putShort(s).array(); + } + + /** + * Retrieves a short from the given two bytes. + */ + public static short toShort(byte firstByte, byte secondByte) { + return ByteBuffer.wrap(new byte[] { firstByte, secondByte }) + .getShort(); + } + + public static BlobRequestOptions withMD5Checking() { + BlobRequestOptions options = new BlobRequestOptions(); + options.setUseTransactionalContentMD5(true); + return options; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java new file mode 100644 index 0000000..62b47ee --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PageBlobInputStream.java @@ -0,0 +1,455 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_DATA_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_HEADER_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.PAGE_SIZE; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.toShort; +import static org.apache.hadoop.fs.azure.PageBlobFormatHelpers.withMD5Checking; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.azure.StorageInterface.CloudPageBlobWrapper; + +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; +import com.microsoft.windowsazure.storage.blob.PageRange; + +/** + * An input stream that reads file data from a page blob stored + * using ASV's custom format. + */ + +final class PageBlobInputStream extends InputStream { + private static final Log LOG = LogFactory.getLog(PageBlobInputStream.class); + + // The blob we're reading from. + private final CloudPageBlobWrapper blob; + // The operation context to use for storage requests. + private final OperationContext opContext; + // The number of pages remaining to be read from the server. + private long numberOfPagesRemaining; + // The current byte offset to start reading from the server next, + // equivalent to (total number of pages we've read) * (page size). + private long currentOffsetInBlob; + // The buffer holding the current data we last read from the server. + private byte[] currentBuffer; + // The current byte offset we're at in the buffer. + private int currentOffsetInBuffer; + // Maximum number of pages to get per any one request. + private static final int MAX_PAGES_PER_DOWNLOAD = + 4 * 1024 * 1024 / PAGE_SIZE; + // Whether the stream has been closed. + private boolean closed = false; + // Total stream size, or -1 if not initialized. + long pageBlobSize = -1; + // Current position in stream of valid data. + long filePosition = 0; + + /** + * Helper method to extract the actual data size of a page blob. + * This typically involves 2 service requests (one for page ranges, another + * for the last page's data). + * + * @param blob The blob to get the size from. + * @param opContext The operation context to use for the requests. + * @return The total data size of the blob in bytes. + * @throws IOException If the format is corrupt. + * @throws StorageException If anything goes wrong in the requests. + */ + public static long getPageBlobSize(CloudPageBlobWrapper blob, + OperationContext opContext) throws IOException, StorageException { + // Get the page ranges for the blob. There should be one range starting + // at byte 0, but we tolerate (and ignore) ranges after the first one. + ArrayList<PageRange> pageRanges = + blob.downloadPageRanges(new BlobRequestOptions(), opContext); + if (pageRanges.size() == 0) { + return 0; + } + if (pageRanges.get(0).getStartOffset() != 0) { + // Not expected: we always upload our page blobs as a contiguous range + // starting at byte 0. + throw badStartRangeException(blob, pageRanges.get(0)); + } + long totalRawBlobSize = pageRanges.get(0).getEndOffset() + 1; + + // Get the last page. + long lastPageStart = totalRawBlobSize - PAGE_SIZE; + ByteArrayOutputStream baos = + new ByteArrayOutputStream(PageBlobFormatHelpers.PAGE_SIZE); + blob.downloadRange(lastPageStart, PAGE_SIZE, baos, + new BlobRequestOptions(), opContext); + + byte[] lastPage = baos.toByteArray(); + short lastPageSize = getPageSize(blob, lastPage, 0); + long totalNumberOfPages = totalRawBlobSize / PAGE_SIZE; + return (totalNumberOfPages - 1) * PAGE_DATA_SIZE + lastPageSize; + } + + /** + * Constructs a stream over the given page blob. + */ + public PageBlobInputStream(CloudPageBlobWrapper blob, + OperationContext opContext) + throws IOException { + this.blob = blob; + this.opContext = opContext; + ArrayList<PageRange> allRanges; + try { + allRanges = + blob.downloadPageRanges(new BlobRequestOptions(), opContext); + } catch (StorageException e) { + throw new IOException(e); + } + if (allRanges.size() > 0) { + if (allRanges.get(0).getStartOffset() != 0) { + throw badStartRangeException(blob, allRanges.get(0)); + } + if (allRanges.size() > 1) { + LOG.warn(String.format( + "Blob %s has %d page ranges beyond the first range. " + + "Only reading the first range.", + blob.getUri(), allRanges.size() - 1)); + } + numberOfPagesRemaining = + (allRanges.get(0).getEndOffset() + 1) / PAGE_SIZE; + } else { + numberOfPagesRemaining = 0; + } + } + + /** Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (pageBlobSize == -1) { + try { + pageBlobSize = getPageBlobSize(blob, opContext); + } catch (StorageException e) { + throw new IOException("Unable to get page blob size.", e); + } + } + + final long remaining = pageBlobSize - filePosition; + return remaining <= Integer.MAX_VALUE ? + (int) remaining : Integer.MAX_VALUE; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + } + + private boolean dataAvailableInBuffer() { + return currentBuffer != null + && currentOffsetInBuffer < currentBuffer.length; + } + + /** + * Check our buffer and download more from the server if needed. + * @return true if there's more data in the buffer, false if we're done. + * @throws IOException + */ + private synchronized boolean ensureDataInBuffer() throws IOException { + if (dataAvailableInBuffer()) { + // We still have some data in our buffer. + return true; + } + currentBuffer = null; + if (numberOfPagesRemaining == 0) { + // No more data to read. + return false; + } + final long pagesToRead = Math.min(MAX_PAGES_PER_DOWNLOAD, + numberOfPagesRemaining); + final int bufferSize = (int) (pagesToRead * PAGE_SIZE); + + // Download page to current buffer. + try { + // Create a byte array output stream to capture the results of the + // download. + ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize); + blob.downloadRange(currentOffsetInBlob, bufferSize, baos, + withMD5Checking(), opContext); + currentBuffer = baos.toByteArray(); + } catch (StorageException e) { + throw new IOException(e); + } + numberOfPagesRemaining -= pagesToRead; + currentOffsetInBlob += bufferSize; + currentOffsetInBuffer = PAGE_HEADER_SIZE; + + // Since we just downloaded a new buffer, validate its consistency. + validateCurrentBufferConsistency(); + + return true; + } + + private void validateCurrentBufferConsistency() + throws IOException { + if (currentBuffer.length % PAGE_SIZE != 0) { + throw new AssertionError("Unexpected buffer size: " + + currentBuffer.length); + } + int numberOfPages = currentBuffer.length / PAGE_SIZE; + for (int page = 0; page < numberOfPages; page++) { + short currentPageSize = getPageSize(blob, currentBuffer, + page * PAGE_SIZE); + // Calculate the number of pages that exist after this one + // in the blob. + long totalPagesAfterCurrent = + (numberOfPages - page - 1) + numberOfPagesRemaining; + // Only the last page is allowed to be not filled completely. + if (currentPageSize < PAGE_DATA_SIZE + && totalPagesAfterCurrent > 0) { + throw fileCorruptException(blob, String.format( + "Page with partial data found in the middle (%d pages from the" + + " end) that only has %d bytes of data.", + totalPagesAfterCurrent, currentPageSize)); + } + } + } + + // Reads the page size from the page header at the given offset. + private static short getPageSize(CloudPageBlobWrapper blob, + byte[] data, int offset) throws IOException { + short pageSize = toShort(data[offset], data[offset + 1]); + if (pageSize < 0 || pageSize > PAGE_DATA_SIZE) { + throw fileCorruptException(blob, String.format( + "Unexpected page size in the header: %d.", + pageSize)); + } + return pageSize; + } + + @Override + public synchronized int read(byte[] outputBuffer, int offset, int len) + throws IOException { + int numberOfBytesRead = 0; + while (len > 0) { + if (!ensureDataInBuffer()) { + filePosition += numberOfBytesRead; + return numberOfBytesRead; + } + int bytesRemainingInCurrentPage = getBytesRemainingInCurrentPage(); + int numBytesToRead = Math.min(len, bytesRemainingInCurrentPage); + System.arraycopy(currentBuffer, currentOffsetInBuffer, outputBuffer, + offset, numBytesToRead); + numberOfBytesRead += numBytesToRead; + offset += numBytesToRead; + len -= numBytesToRead; + if (numBytesToRead == bytesRemainingInCurrentPage) { + // We've finished this page, move on to the next. + advancePagesInBuffer(1); + } else { + currentOffsetInBuffer += numBytesToRead; + } + } + filePosition += numberOfBytesRead; + return numberOfBytesRead; + } + + @Override + public int read() throws IOException { + byte[] oneByte = new byte[1]; + if (read(oneByte) == 0) { + return -1; + } + return oneByte[0]; + } + + /** + * Skips over and discards n bytes of data from this input stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + @Override + public synchronized long skip(long n) throws IOException { + long skipped = skipImpl(n); + filePosition += skipped; // track the position in the stream + return skipped; + } + + private long skipImpl(long n) throws IOException { + + if (n == 0) { + return 0; + } + + // First skip within the current buffer as much as possible. + long skippedWithinBuffer = skipWithinBuffer(n); + if (skippedWithinBuffer > n) { + // TO CONSIDER: Using a contracts framework such as Google's cofoja for + // these post-conditions. + throw new AssertionError(String.format( + "Bug in skipWithinBuffer: it skipped over %d bytes when asked to " + + "skip %d bytes.", skippedWithinBuffer, n)); + } + n -= skippedWithinBuffer; + long skipped = skippedWithinBuffer; + + // Empty the current buffer, we're going beyond it. + currentBuffer = null; + + // Skip over whole pages as necessary without retrieving them from the + // server. + long pagesToSkipOver = Math.min( + n / PAGE_DATA_SIZE, + numberOfPagesRemaining - 1); + numberOfPagesRemaining -= pagesToSkipOver; + currentOffsetInBlob += pagesToSkipOver * PAGE_SIZE; + skipped += pagesToSkipOver * PAGE_DATA_SIZE; + n -= pagesToSkipOver * PAGE_DATA_SIZE; + if (n == 0) { + return skipped; + } + + // Now read in at the current position, and skip within current buffer. + if (!ensureDataInBuffer()) { + return skipped; + } + return skipped + skipWithinBuffer(n); + } + + /** + * Skip over n bytes within the current buffer or just over skip the whole + * buffer if n is greater than the bytes remaining in the buffer. + * @param n The number of data bytes to skip. + * @return The number of bytes actually skipped. + * @throws IOException if data corruption found in the buffer. + */ + private long skipWithinBuffer(long n) throws IOException { + if (!dataAvailableInBuffer()) { + return 0; + } + long skipped = 0; + // First skip within the current page. + skipped = skipWithinCurrentPage(n); + if (skipped > n) { + throw new AssertionError(String.format( + "Bug in skipWithinCurrentPage: it skipped over %d bytes when asked" + + " to skip %d bytes.", skipped, n)); + } + n -= skipped; + if (n == 0 || !dataAvailableInBuffer()) { + return skipped; + } + + // Calculate how many whole pages (pages before the possibly partially + // filled last page) remain. + int currentPageIndex = currentOffsetInBuffer / PAGE_SIZE; + int numberOfPagesInBuffer = currentBuffer.length / PAGE_SIZE; + int wholePagesRemaining = numberOfPagesInBuffer - currentPageIndex - 1; + + if (n < (PAGE_DATA_SIZE * wholePagesRemaining)) { + // I'm within one of the whole pages remaining, skip in there. + advancePagesInBuffer((int) (n / PAGE_DATA_SIZE)); + currentOffsetInBuffer += n % PAGE_DATA_SIZE; + return n + skipped; + } + + // Skip over the whole pages. + advancePagesInBuffer(wholePagesRemaining); + skipped += wholePagesRemaining * PAGE_DATA_SIZE; + n -= wholePagesRemaining * PAGE_DATA_SIZE; + + // At this point we know we need to skip to somewhere in the last page, + // or just go to the end. + return skipWithinCurrentPage(n) + skipped; + } + + /** + * Skip over n bytes within the current page or just over skip the whole + * page if n is greater than the bytes remaining in the page. + * @param n The number of data bytes to skip. + * @return The number of bytes actually skipped. + * @throws IOException if data corruption found in the buffer. + */ + private long skipWithinCurrentPage(long n) throws IOException { + int remainingBytesInCurrentPage = getBytesRemainingInCurrentPage(); + if (n < remainingBytesInCurrentPage) { + currentOffsetInBuffer += n; + return n; + } else { + advancePagesInBuffer(1); + return remainingBytesInCurrentPage; + } + } + + /** + * Gets the number of bytes remaining within the current page in the buffer. + * @return The number of bytes remaining. + * @throws IOException if data corruption found in the buffer. + */ + private int getBytesRemainingInCurrentPage() throws IOException { + if (!dataAvailableInBuffer()) { + return 0; + } + // Calculate our current position relative to the start of the current + // page. + int currentDataOffsetInPage = + (currentOffsetInBuffer % PAGE_SIZE) - PAGE_HEADER_SIZE; + int pageBoundary = getCurrentPageStartInBuffer(); + // Get the data size of the current page from the header. + short sizeOfCurrentPage = getPageSize(blob, currentBuffer, pageBoundary); + return sizeOfCurrentPage - currentDataOffsetInPage; + } + + private static IOException badStartRangeException(CloudPageBlobWrapper blob, + PageRange startRange) { + return fileCorruptException(blob, String.format( + "Page blobs for ASV should always use a page range starting at byte 0. " + + "This starts at byte %d.", + startRange.getStartOffset())); + } + + private void advancePagesInBuffer(int numberOfPages) { + currentOffsetInBuffer = + getCurrentPageStartInBuffer() + + (numberOfPages * PAGE_SIZE) + + PAGE_HEADER_SIZE; + } + + private int getCurrentPageStartInBuffer() { + return PAGE_SIZE * (currentOffsetInBuffer / PAGE_SIZE); + } + + private static IOException fileCorruptException(CloudPageBlobWrapper blob, + String reason) { + return new IOException(String.format( + "The page blob: '%s' is corrupt or has an unexpected format: %s.", + blob.getUri(), reason)); + } +}
