http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index b03997c..047ea1b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -33,7 +33,7 @@ import java.util.TimeZone; import org.apache.commons.httpclient.URIException; import org.apache.commons.httpclient.util.URIUtil; -import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.commons.lang.NotImplementedException; import com.microsoft.windowsazure.storage.CloudStorageAccount; import com.microsoft.windowsazure.storage.OperationContext; @@ -44,10 +44,15 @@ import com.microsoft.windowsazure.storage.StorageUri; import com.microsoft.windowsazure.storage.blob.BlobListingDetails; import com.microsoft.windowsazure.storage.blob.BlobProperties; import com.microsoft.windowsazure.storage.blob.BlobRequestOptions; +import com.microsoft.windowsazure.storage.blob.CloudBlob; import com.microsoft.windowsazure.storage.blob.CloudBlobContainer; import com.microsoft.windowsazure.storage.blob.CloudBlobDirectory; import com.microsoft.windowsazure.storage.blob.CopyState; import com.microsoft.windowsazure.storage.blob.ListBlobItem; +import com.microsoft.windowsazure.storage.blob.PageRange; + +import javax.ws.rs.core.UriBuilder; +import javax.ws.rs.core.UriBuilderException; /** * A mock implementation of the Azure Storage interaction layer for unit tests. @@ -55,7 +60,8 @@ import com.microsoft.windowsazure.storage.blob.ListBlobItem; */ public class MockStorageInterface extends StorageInterface { private InMemoryBlockBlobStore backingStore; - private final ArrayList<PreExistingContainer> preExistingContainers = new ArrayList<MockStorageInterface.PreExistingContainer>(); + private final ArrayList<PreExistingContainer> preExistingContainers = + new ArrayList<MockStorageInterface.PreExistingContainer>(); private String baseUriString; public InMemoryBlockBlobStore getBackingStore() { @@ -107,6 +113,33 @@ public class MockStorageInterface extends StorageInterface { return null; } + /** + * Utility function used to convert a given URI to a decoded string + * representation sent to the backing store. URIs coming as input + * to this class will be encoded by the URI class, and we want + * the underlying storage to store keys in their original UTF-8 form. + */ + private static String convertUriToDecodedString(URI uri) { + try { + String result = URIUtil.decode(uri.toString()); + return result; + } catch (URIException e) { + throw new AssertionError("Failed to decode URI: " + uri.toString()); + } + } + + private static URI convertKeyToEncodedUri(String key) { + try { + String encodedKey = URIUtil.encodePath(key); + URI uri = new URI(encodedKey); + return uri; + } catch (URISyntaxException e) { + throw new AssertionError("Failed to encode key: " + key); + } catch (URIException e) { + throw new AssertionError("Failed to encode key: " + key); + } + } + @Override public CloudBlobContainerWrapper getContainerReference(String name) throws URISyntaxException, StorageException { @@ -196,6 +229,12 @@ public class MockStorageInterface extends StorageInterface { false)), null, 0); } + @Override + public CloudPageBlobWrapper getPageBlobReference(String blobAddressUri) + throws URISyntaxException, StorageException { + return new MockCloudPageBlobWrapper(new URI(blobAddressUri), null, 0); + } + // helper to create full URIs for directory and blob. // use withTrailingSlash=true to get a good path for a directory. private String fullUriString(String relativePath, boolean withTrailingSlash) { @@ -260,24 +299,41 @@ public class MockStorageInterface extends StorageInterface { BlobRequestOptions options, OperationContext opContext) throws URISyntaxException, StorageException { ArrayList<ListBlobItem> ret = new ArrayList<ListBlobItem>(); - String fullPrefix = prefix == null ? uri.toString() : new URI( - uri.getScheme(), uri.getAuthority(), uri.getPath() + prefix, - uri.getQuery(), uri.getFragment()).toString(); - boolean includeMetadata = listingDetails - .contains(BlobListingDetails.METADATA); + URI searchUri = null; + if (prefix == null) { + searchUri = uri; + } else { + try { + searchUri = UriBuilder.fromUri(uri).path(prefix).build(); + } catch (UriBuilderException e) { + throw new AssertionError("Failed to encode path: " + prefix); + } + } + + String fullPrefix = convertUriToDecodedString(searchUri); + boolean includeMetadata = listingDetails.contains(BlobListingDetails.METADATA); HashSet<String> addedDirectories = new HashSet<String>(); - for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore - .listBlobs(fullPrefix, includeMetadata)) { + for (InMemoryBlockBlobStore.ListBlobEntry current : backingStore.listBlobs( + fullPrefix, includeMetadata)) { int indexOfSlash = current.getKey().indexOf('/', fullPrefix.length()); if (useFlatBlobListing || indexOfSlash < 0) { - ret.add(new MockCloudBlockBlobWrapper(new URI(current.getKey()), - current.getMetadata(), current.getContentLength())); + if (current.isPageBlob()) { + ret.add(new MockCloudPageBlobWrapper( + convertKeyToEncodedUri(current.getKey()), + current.getMetadata(), + current.getContentLength())); + } else { + ret.add(new MockCloudBlockBlobWrapper( + convertKeyToEncodedUri(current.getKey()), + current.getMetadata(), + current.getContentLength())); + } } else { String directoryName = current.getKey().substring(0, indexOfSlash); if (!addedDirectories.contains(directoryName)) { addedDirectories.add(current.getKey()); - ret.add(new MockCloudBlobDirectoryWrapper(new URI(directoryName - + "/"))); + ret.add(new MockCloudBlobDirectoryWrapper(new URI( + directoryName + "/"))); } } } @@ -286,35 +342,35 @@ public class MockStorageInterface extends StorageInterface { @Override public StorageUri getStorageUri() { - throw new UnsupportedOperationException(); + throw new NotImplementedException(); } - } - class MockCloudBlockBlobWrapper extends CloudBlockBlobWrapper { - private URI uri; - private HashMap<String, String> metadata = new HashMap<String, String>(); - private BlobProperties properties; + abstract class MockCloudBlobWrapper implements CloudBlobWrapper { + protected final URI uri; + protected HashMap<String, String> metadata = + new HashMap<String, String>(); + protected BlobProperties properties; - public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata, + protected MockCloudBlobWrapper(URI uri, HashMap<String, String> metadata, int length) { this.uri = uri; this.metadata = metadata; this.properties = new BlobProperties(); this.properties.setLength(length); - this.properties.setLastModified(Calendar.getInstance( - TimeZone.getTimeZone("UTC")).getTime()); + this.properties.setLastModified( + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime()); } - private void refreshProperties(boolean getMetadata) { - if (backingStore.exists(uri.toString())) { - byte[] content = backingStore.getContent(uri.toString()); + protected void refreshProperties(boolean getMetadata) { + if (backingStore.exists(convertUriToDecodedString(uri))) { + byte[] content = backingStore.getContent(convertUriToDecodedString(uri)); properties = new BlobProperties(); properties.setLength(content.length); - properties.setLastModified(Calendar.getInstance( - TimeZone.getTimeZone("UTC")).getTime()); + properties.setLastModified( + Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTime()); if (getMetadata) { - metadata = backingStore.getMetadata(uri.toString()); + metadata = backingStore.getMetadata(convertUriToDecodedString(uri)); } } } @@ -347,26 +403,27 @@ public class MockStorageInterface extends StorageInterface { } @Override - public void startCopyFromBlob(CloudBlockBlobWrapper sourceBlob, + public void startCopyFromBlob(URI source, OperationContext opContext) throws StorageException, URISyntaxException { - backingStore.copy(sourceBlob.getUri().toString(), uri.toString()); - // it would be best if backingStore.properties.CopyState were tracked - // If implemented, update azureNativeFileSystemStore.waitForCopyToComplete + backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri)); + //TODO: set the backingStore.properties.CopyState and + // update azureNativeFileSystemStore.waitForCopyToComplete } @Override public CopyState getCopyState() { - return this.properties.getCopyState(); + return this.properties.getCopyState(); } @Override - public void delete(OperationContext opContext) throws StorageException { - backingStore.delete(uri.toString()); + public void delete(OperationContext opContext, SelfRenewingLease lease) + throws StorageException { + backingStore.delete(convertUriToDecodedString(uri)); } @Override public boolean exists(OperationContext opContext) throws StorageException { - return backingStore.exists(uri.toString()); + return backingStore.exists(convertUriToDecodedString(uri)); } @Override @@ -383,37 +440,90 @@ public class MockStorageInterface extends StorageInterface { @Override public InputStream openInputStream(BlobRequestOptions options, OperationContext opContext) throws StorageException { - return new ByteArrayInputStream(backingStore.getContent(uri.toString())); + return new ByteArrayInputStream( + backingStore.getContent(convertUriToDecodedString(uri))); + } + + @Override + public void uploadMetadata(OperationContext opContext) + throws StorageException { + backingStore.setMetadata(convertUriToDecodedString(uri), metadata); + } + + @Override + public void downloadRange(long offset, long length, OutputStream os, + BlobRequestOptions options, OperationContext opContext) + throws StorageException { + throw new NotImplementedException(); + } + } + + class MockCloudBlockBlobWrapper extends MockCloudBlobWrapper + implements CloudBlockBlobWrapper { + public MockCloudBlockBlobWrapper(URI uri, HashMap<String, String> metadata, + int length) { + super(uri, metadata, length); } @Override public OutputStream openOutputStream(BlobRequestOptions options, OperationContext opContext) throws StorageException { - return backingStore.upload(uri.toString(), metadata); + return backingStore.uploadBlockBlob(convertUriToDecodedString(uri), + metadata); } @Override - public void upload(InputStream sourceStream, OperationContext opContext) - throws StorageException, IOException { - ByteArrayOutputStream allContent = new ByteArrayOutputStream(); - allContent.write(sourceStream); - backingStore.setContent(uri.toString(), allContent.toByteArray(), - metadata); - refreshProperties(false); - allContent.close(); + public void setStreamMinimumReadSizeInBytes(int minimumReadSizeBytes) { } @Override - public void uploadMetadata(OperationContext opContext) - throws StorageException { - backingStore.setContent(uri.toString(), - backingStore.getContent(uri.toString()), metadata); + public void setWriteBlockSizeInBytes(int writeBlockSizeBytes) { } @Override - public void uploadProperties(OperationContext opContext) - throws StorageException { - refreshProperties(false); + public StorageUri getStorageUri() { + return null; + } + + @Override + public void uploadProperties(OperationContext context, SelfRenewingLease lease) { + } + + @Override + public SelfRenewingLease acquireLease() { + return null; + } + + @Override + public CloudBlob getBlob() { + return null; + } + } + + class MockCloudPageBlobWrapper extends MockCloudBlobWrapper + implements CloudPageBlobWrapper { + public MockCloudPageBlobWrapper(URI uri, HashMap<String, String> metadata, + int length) { + super(uri, metadata, length); + } + + @Override + public void create(long length, BlobRequestOptions options, + OperationContext opContext) throws StorageException { + throw new NotImplementedException(); + } + + @Override + public void uploadPages(InputStream sourceStream, long offset, long length, + BlobRequestOptions options, OperationContext opContext) + throws StorageException, IOException { + throw new NotImplementedException(); + } + + @Override + public ArrayList<PageRange> downloadPageRanges(BlobRequestOptions options, + OperationContext opContext) throws StorageException { + throw new NotImplementedException(); } @Override @@ -426,8 +536,23 @@ public class MockStorageInterface extends StorageInterface { @Override public StorageUri getStorageUri() { - throw new UnsupportedOperationException(); + throw new NotImplementedException(); } + @Override + public void uploadProperties(OperationContext opContext, + SelfRenewingLease lease) + throws StorageException { + } + + @Override + public SelfRenewingLease acquireLease() { + return null; + } + + @Override + public CloudBlob getBlob() { + return null; + } } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java index e731b21..01cf713 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.junit.Assume.assumeNotNull; import java.io.BufferedReader; @@ -32,10 +33,15 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Calendar; import java.util.Date; import java.util.TimeZone; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -49,6 +55,13 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.apache.hadoop.fs.azure.AzureException; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem.FolderRenamePending; + +import com.microsoft.windowsazure.storage.AccessCondition; +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.CloudBlob; + /* * Tests the Native Azure file system (WASB) against an actual blob store if * provided in the environment. @@ -59,12 +72,13 @@ import org.junit.Test; */ public abstract class NativeAzureFileSystemBaseTest { - private FileSystem fs; + protected FileSystem fs; private AzureBlobStorageTestAccount testAccount; private final long modifiedTimeErrorMargin = 5 * 1000; // Give it +/-5 seconds - protected abstract AzureBlobStorageTestAccount createTestAccount() - throws Exception; + protected abstract AzureBlobStorageTestAccount createTestAccount() throws Exception; + + public static final Log LOG = LogFactory.getLog(NativeAzureFileSystemBaseTest.class); @Before public void setUp() throws Exception { @@ -140,7 +154,7 @@ public abstract class NativeAzureFileSystemBaseTest { private void testOwnership(Path pathUnderTest) throws IOException { FileStatus ret = fs.getFileStatus(pathUnderTest); UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - assertEquals(ret.getOwner(), currentUser.getShortUserName()); + assertTrue(ret.getOwner().equals(currentUser.getShortUserName())); fs.delete(pathUnderTest, true); } @@ -177,18 +191,29 @@ public abstract class NativeAzureFileSystemBaseTest { fs.delete(testFolder, true); } - @Test - public void testDeepFileCreation() throws Exception { - Path testFile = new Path("deep/file/creation/test"); - FsPermission permission = FsPermission.createImmutable((short) 644); + void testDeepFileCreationBase(String testFilePath, String firstDirPath, String middleDirPath, + short permissionShort, short umaskedPermissionShort) throws Exception { + Path testFile = new Path(testFilePath); + Path firstDir = new Path(firstDirPath); + Path middleDir = new Path(middleDirPath); + FsPermission permission = FsPermission.createImmutable(permissionShort); + FsPermission umaskedPermission = FsPermission.createImmutable(umaskedPermissionShort); + createEmptyFile(testFile, permission); + FsPermission rootPerm = fs.getFileStatus(firstDir.getParent()).getPermission(); + FsPermission inheritPerm = FsPermission.createImmutable((short)(rootPerm.toShort() | 0300)); assertTrue(fs.exists(testFile)); - assertTrue(fs.exists(new Path("deep"))); - assertTrue(fs.exists(new Path("deep/file/creation"))); - FileStatus ret = fs.getFileStatus(new Path("deep/file")); - assertTrue(ret.isDirectory()); - assertEqualsIgnoreStickyBit(permission, ret.getPermission()); - assertTrue(fs.delete(new Path("deep"), true)); + assertTrue(fs.exists(firstDir)); + assertTrue(fs.exists(middleDir)); + // verify that the indirectly created directory inherited its permissions from the root directory + FileStatus directoryStatus = fs.getFileStatus(middleDir); + assertTrue(directoryStatus.isDirectory()); + assertEqualsIgnoreStickyBit(inheritPerm, directoryStatus.getPermission()); + // verify that the file itself has the permissions as specified + FileStatus fileStatus = fs.getFileStatus(testFile); + assertFalse(fileStatus.isDirectory()); + assertEqualsIgnoreStickyBit(umaskedPermission, fileStatus.getPermission()); + assertTrue(fs.delete(firstDir, true)); assertFalse(fs.exists(testFile)); // An alternative test scenario would've been to delete the file first, @@ -196,6 +221,22 @@ public abstract class NativeAzureFileSystemBaseTest { // doesn't actually work as expected right now. } + @Test + public void testDeepFileCreation() throws Exception { + // normal permissions in user home + testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0644, (short)0644); + // extra permissions in user home. umask will change the actual permissions. + testDeepFileCreationBase("deep/file/creation/test", "deep", "deep/file/creation", (short)0777, (short)0755); + // normal permissions in root + testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0644, (short)0644); + // less permissions in root + testDeepFileCreationBase("/deep/file/creation/test", "/deep", "/deep/file/creation", (short)0700, (short)0700); + // one indirectly created directory in root + testDeepFileCreationBase("/deep/file", "/deep", "/deep", (short)0644, (short)0644); + // one indirectly created directory in user home + testDeepFileCreationBase("deep/file", "deep", "deep", (short)0644, (short)0644); + } + private static enum RenameVariation { NormalFileName, SourceInAFolder, SourceWithSpace, SourceWithPlusAndPercent } @@ -206,20 +247,20 @@ public abstract class NativeAzureFileSystemBaseTest { System.out.printf("Rename variation: %s\n", variation); Path originalFile; switch (variation) { - case NormalFileName: - originalFile = new Path("fileToRename"); - break; - case SourceInAFolder: - originalFile = new Path("file/to/rename"); - break; - case SourceWithSpace: - originalFile = new Path("file to rename"); - break; - case SourceWithPlusAndPercent: - originalFile = new Path("file+to%rename"); - break; - default: - throw new Exception("Unknown variation"); + case NormalFileName: + originalFile = new Path("fileToRename"); + break; + case SourceInAFolder: + originalFile = new Path("file/to/rename"); + break; + case SourceWithSpace: + originalFile = new Path("file to rename"); + break; + case SourceWithPlusAndPercent: + originalFile = new Path("file+to%rename"); + break; + default: + throw new Exception("Unknown variation"); } Path destinationFile = new Path("file/resting/destination"); assertTrue(fs.createNewFile(originalFile)); @@ -227,7 +268,8 @@ public abstract class NativeAzureFileSystemBaseTest { assertFalse(fs.rename(originalFile, destinationFile)); // Parent directory // doesn't exist assertTrue(fs.mkdirs(destinationFile.getParent())); - assertTrue(fs.rename(originalFile, destinationFile)); + boolean result = fs.rename(originalFile, destinationFile); + assertTrue(result); assertTrue(fs.exists(destinationFile)); assertFalse(fs.exists(originalFile)); fs.delete(destinationFile.getParent(), true); @@ -239,10 +281,10 @@ public abstract class NativeAzureFileSystemBaseTest { Path testFile = new Path("deep/file/rename/test"); FsPermission permission = FsPermission.createImmutable((short) 644); createEmptyFile(testFile, permission); - assertTrue(fs.rename(new Path("deep/file"), new Path("deep/renamed"))); + boolean renameResult = fs.rename(new Path("deep/file"), new Path("deep/renamed")); + assertTrue(renameResult); assertFalse(fs.exists(testFile)); - FileStatus newStatus = fs - .getFileStatus(new Path("deep/renamed/rename/test")); + FileStatus newStatus = fs.getFileStatus(new Path("deep/renamed/rename/test")); assertNotNull(newStatus); assertEqualsIgnoreStickyBit(permission, newStatus.getPermission()); assertTrue(fs.delete(new Path("deep"), true)); @@ -256,21 +298,25 @@ public abstract class NativeAzureFileSystemBaseTest { public void testRenameFolder() throws Exception { for (RenameFolderVariation variation : RenameFolderVariation.values()) { Path originalFolder = new Path("folderToRename"); - if (variation != RenameFolderVariation.CreateJustInnerFile){ + if (variation != RenameFolderVariation.CreateJustInnerFile) { assertTrue(fs.mkdirs(originalFolder)); } Path innerFile = new Path(originalFolder, "innerFile"); - if (variation != RenameFolderVariation.CreateJustFolder){ + Path innerFile2 = new Path(originalFolder, "innerFile2"); + if (variation != RenameFolderVariation.CreateJustFolder) { assertTrue(fs.createNewFile(innerFile)); + assertTrue(fs.createNewFile(innerFile2)); } Path destination = new Path("renamedFolder"); assertTrue(fs.rename(originalFolder, destination)); assertTrue(fs.exists(destination)); - if (variation != RenameFolderVariation.CreateJustFolder){ + if (variation != RenameFolderVariation.CreateJustFolder) { assertTrue(fs.exists(new Path(destination, innerFile.getName()))); + assertTrue(fs.exists(new Path(destination, innerFile2.getName()))); } assertFalse(fs.exists(originalFolder)); assertFalse(fs.exists(innerFile)); + assertFalse(fs.exists(innerFile2)); fs.delete(destination, true); } } @@ -365,6 +411,43 @@ public abstract class NativeAzureFileSystemBaseTest { } @Test + public void testChineseCharacters() throws Exception { + // Create a file and a folder with Chinese (non-ASCI) characters + String chinese = "" + '\u963f' + '\u4db5'; + String fileName = "filename" + chinese; + String directoryName = chinese; + fs.create(new Path(directoryName, fileName)).close(); + FileStatus[] listing = fs.listStatus(new Path(directoryName)); + assertEquals(1, listing.length); + assertEquals(fileName, listing[0].getPath().getName()); + FileStatus status = fs.getFileStatus(new Path(directoryName, fileName)); + assertEquals(fileName, status.getPath().getName()); + InputStream stream = fs.open(new Path(directoryName, fileName)); + assertNotNull(stream); + stream.close(); + assertTrue(fs.delete(new Path(directoryName, fileName), true)); + assertTrue(fs.delete(new Path(directoryName), true)); + } + + @Test + public void testChineseCharactersFolderRename() throws Exception { + // Create a file and a folder with Chinese (non-ASCI) characters + String chinese = "" + '\u963f' + '\u4db5'; + String fileName = "filename" + chinese; + String srcDirectoryName = chinese; + String targetDirectoryName = "target" + chinese; + fs.create(new Path(srcDirectoryName, fileName)).close(); + fs.rename(new Path(srcDirectoryName), new Path(targetDirectoryName)); + FileStatus[] listing = fs.listStatus(new Path(targetDirectoryName)); + assertEquals(1, listing.length); + assertEquals(fileName, listing[0].getPath().getName()); + FileStatus status = fs.getFileStatus(new Path(targetDirectoryName, fileName)); + assertEquals(fileName, status.getPath().getName()); + assertTrue(fs.delete(new Path(targetDirectoryName, fileName), true)); + assertTrue(fs.delete(new Path(targetDirectoryName), true)); + } + + @Test public void testReadingDirectoryAsFile() throws Exception { Path dir = new Path("/x"); assertTrue(fs.mkdirs(dir)); @@ -403,7 +486,12 @@ public abstract class NativeAzureFileSystemBaseTest { assertEquals("supergroup", newStatus.getGroup()); assertEquals(UserGroupInformation.getCurrentUser().getShortUserName(), newStatus.getOwner()); - assertEquals(1, newStatus.getLen()); + + // Don't check the file length for page blobs. Only block blobs + // provide the actual length of bytes written. + if (!(this instanceof TestNativeAzureFSPageBlobLive)) { + assertEquals(1, newStatus.getLen()); + } } @Test @@ -429,7 +517,12 @@ public abstract class NativeAzureFileSystemBaseTest { assertNotNull(newStatus); assertEquals("newUser", newStatus.getOwner()); assertEquals("supergroup", newStatus.getGroup()); - assertEquals(1, newStatus.getLen()); + + // File length is only reported to be the size of bytes written to the file for block blobs. + // So only check it for block blobs, not page blobs. + if (!(this instanceof TestNativeAzureFSPageBlobLive)) { + assertEquals(1, newStatus.getLen()); + } fs.setOwner(newFile, null, "newGroup"); newStatus = fs.getFileStatus(newFile); assertNotNull(newStatus); @@ -506,14 +599,570 @@ public abstract class NativeAzureFileSystemBaseTest { testModifiedTime(destFolder); } + /** + * Verify we can get file status of a directory with various forms of + * the directory file name, including the nonstandard but legal form + * ending in "/.". Check that we're getting status for a directory. + */ @Test public void testListSlash() throws Exception { Path testFolder = new Path("/testFolder"); Path testFile = new Path(testFolder, "testFile"); assertTrue(fs.mkdirs(testFolder)); assertTrue(fs.createNewFile(testFile)); - FileStatus status = fs.getFileStatus(new Path("/testFolder/.")); - assertNotNull(status); + FileStatus status; + status = fs.getFileStatus(new Path("/testFolder")); + assertTrue(status.isDirectory()); + status = fs.getFileStatus(new Path("/testFolder/")); + assertTrue(status.isDirectory()); + status = fs.getFileStatus(new Path("/testFolder/.")); + assertTrue(status.isDirectory()); + } + + @Test + public void testCannotCreatePageBlobByDefault() throws Exception { + + // Verify that the page blob directory list configuration setting + // is not set in the default configuration. + Configuration conf = new Configuration(); + String[] rawPageBlobDirs = + conf.getStrings(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES); + assertTrue(rawPageBlobDirs == null); + } + + /* + * Set up a situation where a folder rename is partway finished. + * Then apply redo to finish the rename. + * + * The original source folder *would* have had contents + * folderToRename (0 byte dummy file for directory) + * folderToRename/innerFile + * folderToRename/innerFile2 + * + * The actual source folder (after partial rename and failure) + * + * folderToRename + * folderToRename/innerFile2 + * + * The actual target folder (after partial rename and failure) + * + * renamedFolder + * renamedFolder/innerFile + */ + @Test + public void testRedoRenameFolder() throws IOException { + // create original folder + String srcKey = "folderToRename"; + Path originalFolder = new Path(srcKey); + assertTrue(fs.mkdirs(originalFolder)); + Path innerFile = new Path(originalFolder, "innerFile"); + assertTrue(fs.createNewFile(innerFile)); + Path innerFile2 = new Path(originalFolder, "innerFile2"); + assertTrue(fs.createNewFile(innerFile2)); + + String dstKey = "renamedFolder"; + + // propose (but don't do) the rename + Path home = fs.getHomeDirectory(); + String relativeHomeDir = getRelativePath(home.toString()); + NativeAzureFileSystem.FolderRenamePending pending = + new NativeAzureFileSystem.FolderRenamePending( + relativeHomeDir + "/" + srcKey, + relativeHomeDir + "/" + dstKey, null, + (NativeAzureFileSystem) fs); + + // get the rename pending file contents + String renameDescription = pending.makeRenamePendingFileContents(); + + // Remove one file from source folder to simulate a partially done + // rename operation. + assertTrue(fs.delete(innerFile, false)); + + // Create the destination folder with just one file in it, again + // to simulate a partially done rename. + Path destination = new Path(dstKey); + Path innerDest = new Path(destination, "innerFile"); + assertTrue(fs.createNewFile(innerDest)); + + // Create a rename-pending file and write rename information to it. + final String renamePendingStr = "folderToRename-RenamePending.json"; + Path renamePendingFile = new Path(renamePendingStr); + FSDataOutputStream out = fs.create(renamePendingFile, true); + assertTrue(out != null); + writeString(out, renameDescription); + + // Redo the rename operation based on the contents of the -RenamePending.json file. + // Trigger the redo by checking for existence of the original folder. It must appear + // to not exist. + assertFalse(fs.exists(originalFolder)); + + // Verify that the target is there, and the source is gone. + assertTrue(fs.exists(destination)); + assertTrue(fs.exists(new Path(destination, innerFile.getName()))); + assertTrue(fs.exists(new Path(destination, innerFile2.getName()))); + assertFalse(fs.exists(originalFolder)); + assertFalse(fs.exists(innerFile)); + assertFalse(fs.exists(innerFile2)); + + // Verify that there's no RenamePending file left. + assertFalse(fs.exists(renamePendingFile)); + + // Verify that we can list the target directory. + FileStatus[] listed = fs.listStatus(destination); + assertEquals(2, listed.length); + + // List the home directory and show the contents is a directory. + Path root = fs.getHomeDirectory(); + listed = fs.listStatus(root); + assertEquals(1, listed.length); + assertTrue(listed[0].isDirectory()); + } + + /** + * If there is a folder to be renamed inside a parent folder, + * then when you list the parent folder, you should only see + * the final result, after the rename. + */ + @Test + public void testRedoRenameFolderInFolderListing() throws IOException { + + // create original folder + String parent = "parent"; + Path parentFolder = new Path(parent); + assertTrue(fs.mkdirs(parentFolder)); + Path inner = new Path(parentFolder, "innerFolder"); + assertTrue(fs.mkdirs(inner)); + Path inner2 = new Path(parentFolder, "innerFolder2"); + assertTrue(fs.mkdirs(inner2)); + Path innerFile = new Path(inner2, "file"); + assertTrue(fs.createNewFile(innerFile)); + + Path inner2renamed = new Path(parentFolder, "innerFolder2Renamed"); + + // propose (but don't do) the rename of innerFolder2 + Path home = fs.getHomeDirectory(); + String relativeHomeDir = getRelativePath(home.toString()); + NativeAzureFileSystem.FolderRenamePending pending = + new NativeAzureFileSystem.FolderRenamePending( + relativeHomeDir + "/" + inner2, + relativeHomeDir + "/" + inner2renamed, null, + (NativeAzureFileSystem) fs); + + // Create a rename-pending file and write rename information to it. + final String renamePendingStr = inner2 + FolderRenamePending.SUFFIX; + Path renamePendingFile = new Path(renamePendingStr); + FSDataOutputStream out = fs.create(renamePendingFile, true); + assertTrue(out != null); + writeString(out, pending.makeRenamePendingFileContents()); + + // Redo the rename operation based on the contents of the + // -RenamePending.json file. Trigger the redo by checking for existence of + // the original folder. It must appear to not exist. + FileStatus[] listed = fs.listStatus(parentFolder); + assertEquals(2, listed.length); + assertTrue(listed[0].isDirectory()); + assertTrue(listed[1].isDirectory()); + + // The rename pending file is not a directory, so at this point we know the + // redo has been done. + assertFalse(fs.exists(inner2)); // verify original folder is gone + assertTrue(fs.exists(inner2renamed)); // verify the target is there + assertTrue(fs.exists(new Path(inner2renamed, "file"))); + } + + /** + * Test the situation where a rename pending file exists but the rename + * is really done. This could happen if the rename process died just + * before deleting the rename pending file. It exercises a non-standard + * code path in redo(). + */ + @Test + public void testRenameRedoFolderAlreadyDone() throws IOException { + // create only destination folder + String orig = "originalFolder"; + String dest = "renamedFolder"; + Path destPath = new Path(dest); + assertTrue(fs.mkdirs(destPath)); + + // propose (but don't do) the rename of innerFolder2 + Path home = fs.getHomeDirectory(); + String relativeHomeDir = getRelativePath(home.toString()); + NativeAzureFileSystem.FolderRenamePending pending = + new NativeAzureFileSystem.FolderRenamePending( + relativeHomeDir + "/" + orig, + relativeHomeDir + "/" + dest, null, + (NativeAzureFileSystem) fs); + + // Create a rename-pending file and write rename information to it. + final String renamePendingStr = orig + FolderRenamePending.SUFFIX; + Path renamePendingFile = new Path(renamePendingStr); + FSDataOutputStream out = fs.create(renamePendingFile, true); + assertTrue(out != null); + writeString(out, pending.makeRenamePendingFileContents()); + + try { + pending.redo(); + } catch (Exception e) { + fail(); + } + + // Make sure rename pending file is gone. + FileStatus[] listed = fs.listStatus(new Path("/")); + assertEquals(1, listed.length); + assertTrue(listed[0].isDirectory()); + } + + @Test + public void testRedoFolderRenameAll() throws IllegalArgumentException, IOException { + { + FileFolder original = new FileFolder("folderToRename"); + original.add("innerFile").add("innerFile2"); + FileFolder partialSrc = original.copy(); + FileFolder partialDst = original.copy(); + partialDst.setName("renamedFolder"); + partialSrc.setPresent(0, false); + partialDst.setPresent(1, false); + + testRenameRedoFolderSituation(original, partialSrc, partialDst); + } + { + FileFolder original = new FileFolder("folderToRename"); + original.add("file1").add("file2").add("file3"); + FileFolder partialSrc = original.copy(); + FileFolder partialDst = original.copy(); + partialDst.setName("renamedFolder"); + + // Set up this state before the redo: + // folderToRename: file1 file3 + // renamedFolder: file1 file2 + // This gives code coverage for all 3 expected cases for individual file + // redo. + partialSrc.setPresent(1, false); + partialDst.setPresent(2, false); + + testRenameRedoFolderSituation(original, partialSrc, partialDst); + } + { + // Simulate a situation with folder with a large number of files in it. + // For the first half of the files, they will be in the destination + // but not the source. For the second half, they will be in the source + // but not the destination. There will be one file in the middle that is + // in both source and destination. Then trigger redo and verify. + // For testing larger folder sizes, manually change this, temporarily, and + // edit the SIZE value. + final int SIZE = 5; + assertTrue(SIZE >= 3); + // Try a lot of files in the folder. + FileFolder original = new FileFolder("folderToRename"); + for (int i = 0; i < SIZE; i++) { + original.add("file" + Integer.toString(i)); + } + FileFolder partialSrc = original.copy(); + FileFolder partialDst = original.copy(); + partialDst.setName("renamedFolder"); + for (int i = 0; i < SIZE; i++) { + partialSrc.setPresent(i, i >= SIZE / 2); + partialDst.setPresent(i, i <= SIZE / 2); + } + + testRenameRedoFolderSituation(original, partialSrc, partialDst); + } + { + // Do a nested folder, like so: + // folderToRename: + // nestedFolder: a, b, c + // p + // q + // + // Then delete file 'a' from the source and add it to destination. + // Then trigger redo. + + FileFolder original = new FileFolder("folderToRename"); + FileFolder nested = new FileFolder("nestedFolder"); + nested.add("a").add("b").add("c"); + original.add(nested).add("p").add("q"); + + FileFolder partialSrc = original.copy(); + FileFolder partialDst = original.copy(); + partialDst.setName("renamedFolder"); + + // logically remove 'a' from source + partialSrc.getMember(0).setPresent(0, false); + + // logically eliminate b, c from destination + partialDst.getMember(0).setPresent(1, false); + partialDst.getMember(0).setPresent(2, false); + + testRenameRedoFolderSituation(original, partialSrc, partialDst); + } + } + + private void testRenameRedoFolderSituation( + FileFolder fullSrc, + FileFolder partialSrc, + FileFolder partialDst) throws IllegalArgumentException, IOException { + + // make file folder tree for source + fullSrc.create(); + + // set up rename pending file + fullSrc.makeRenamePending(partialDst); + + // prune away some files (as marked) from source to simulate partial rename + partialSrc.prune(); + + // Create only the files indicated for the destination to indicate a partial rename. + partialDst.create(); + + // trigger redo + assertFalse(fullSrc.exists()); + + // verify correct results + partialDst.verifyExists(); + fullSrc.verifyGone(); + + // delete the new folder to leave no garbage behind + fs.delete(new Path(partialDst.getName()), true); + } + + // Mock up of a generalized folder (which can also be a leaf-level file) + // for rename redo testing. + private class FileFolder { + private String name; + + // For rename testing, indicates whether an expected + // file is present in the source or target folder. + private boolean present; + ArrayList<FileFolder> members; // Null if a leaf file, otherwise not null. + + // Make a new, empty folder (not a regular leaf file). + public FileFolder(String name) { + this.name = name; + this.present = true; + members = new ArrayList<FileFolder>(); + } + + public FileFolder getMember(int i) { + return members.get(i); + } + + // Verify a folder and all its contents are gone. This is only to + // be called on the root of a FileFolder. + public void verifyGone() throws IllegalArgumentException, IOException { + assertFalse(fs.exists(new Path(name))); + assertTrue(isFolder()); + verifyGone(new Path(name), members); + } + + private void verifyGone(Path prefix, ArrayList<FileFolder> members2) throws IOException { + for (FileFolder f : members2) { + f.verifyGone(prefix); + } + } + + private void verifyGone(Path prefix) throws IOException { + assertFalse(fs.exists(new Path(prefix, name))); + if (isLeaf()) { + return; + } + for (FileFolder f : members) { + f.verifyGone(new Path(prefix, name)); + } + } + + public void verifyExists() throws IllegalArgumentException, IOException { + + // verify the root is present + assertTrue(fs.exists(new Path(name))); + assertTrue(isFolder()); + + // check the members + verifyExists(new Path(name), members); + } + + private void verifyExists(Path prefix, ArrayList<FileFolder> members2) throws IOException { + for (FileFolder f : members2) { + f.verifyExists(prefix); + } + } + + private void verifyExists(Path prefix) throws IOException { + + // verify this file/folder is present + assertTrue(fs.exists(new Path(prefix, name))); + + // verify members are present + if (isLeaf()) { + return; + } + + for (FileFolder f : members) { + f.verifyExists(new Path(prefix, name)); + } + } + + public boolean exists() throws IOException { + return fs.exists(new Path(name)); + } + + // Make a rename pending file for the situation where we rename + // this object (the source) to the specified destination. + public void makeRenamePending(FileFolder dst) throws IOException { + + // Propose (but don't do) the rename. + Path home = fs.getHomeDirectory(); + String relativeHomeDir = getRelativePath(home.toString()); + NativeAzureFileSystem.FolderRenamePending pending = + new NativeAzureFileSystem.FolderRenamePending( + relativeHomeDir + "/" + this.getName(), + relativeHomeDir + "/" + dst.getName(), null, + (NativeAzureFileSystem) fs); + + // Get the rename pending file contents. + String renameDescription = pending.makeRenamePendingFileContents(); + + // Create a rename-pending file and write rename information to it. + final String renamePendingStr = this.getName() + "-RenamePending.json"; + Path renamePendingFile = new Path(renamePendingStr); + FSDataOutputStream out = fs.create(renamePendingFile, true); + assertTrue(out != null); + writeString(out, renameDescription); + } + + // set whether a child is present or not + public void setPresent(int i, boolean b) { + members.get(i).setPresent(b); + } + + // Make an uninitialized folder + private FileFolder() { + this.present = true; + } + + public void setPresent(boolean value) { + present = value; + } + + public FileFolder makeLeaf(String name) { + FileFolder f = new FileFolder(); + f.setName(name); + return f; + } + + void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public boolean isLeaf() { + return members == null; + } + + public boolean isFolder() { + return members != null; + } + + FileFolder add(FileFolder folder) { + members.add(folder); + return this; + } + + // Add a leaf file (by convention, if you pass a string argument, you get a leaf). + FileFolder add(String file) { + FileFolder leaf = makeLeaf(file); + members.add(leaf); + return this; + } + + public FileFolder copy() { + if (isLeaf()) { + return makeLeaf(name); + } else { + FileFolder f = new FileFolder(name); + for (FileFolder member : members) { + f.add(member.copy()); + } + return f; + } + } + + // Create the folder structure. Return true on success, or else false. + public void create() throws IllegalArgumentException, IOException { + create(null); + } + + private void create(Path prefix) throws IllegalArgumentException, IOException { + if (isFolder()) { + if (present) { + assertTrue(fs.mkdirs(makePath(prefix, name))); + } + create(makePath(prefix, name), members); + } else if (isLeaf()) { + if (present) { + assertTrue(fs.createNewFile(makePath(prefix, name))); + } + } else { + assertTrue("The object must be a (leaf) file or a folder.", false); + } + } + + private void create(Path prefix, ArrayList<FileFolder> members2) throws IllegalArgumentException, IOException { + for (FileFolder f : members2) { + f.create(prefix); + } + } + + private Path makePath(Path prefix, String name) { + if (prefix == null) { + return new Path(name); + } else { + return new Path(prefix, name); + } + } + + // Remove the files marked as not present. + public void prune() throws IOException { + prune(null); + } + + private void prune(Path prefix) throws IOException { + Path path = null; + if (prefix == null) { + path = new Path(name); + } else { + path = new Path(prefix, name); + } + if (isLeaf() && !present) { + assertTrue(fs.delete(path, false)); + } else if (isFolder() && !present) { + assertTrue(fs.delete(path, true)); + } else if (isFolder()) { + for (FileFolder f : members) { + f.prune(path); + } + } + } + } + + private String getRelativePath(String path) { + // example input: wasb://[email protected]/user/ehans/folderToRename + // example result: user/ehans/folderToRename + + // Find the third / position and return input substring after that. + int slashCount = 0; // number of slashes so far + int i; + for (i = 0; i < path.length(); i++) { + if (path.charAt(i) == '/') { + slashCount++; + if (slashCount == 3) { + return path.substring(i + 1, path.length()); + } + } + } + throw new RuntimeException("Incorrect path prefix -- expected wasb://.../..."); } @Test @@ -523,6 +1172,84 @@ public abstract class NativeAzureFileSystemBaseTest { fs.close(); } + // Test the available() method for the input stream returned by fs.open(). + // This works for both page and block blobs. + int FILE_SIZE = 4 * 1024 * 1024 + 1; // Make this 1 bigger than internal + // buffer used in BlobInputStream + // to exercise that case. + int MAX_STRIDE = FILE_SIZE + 1; + Path PATH = new Path("/available.dat"); + @Test + public void testAvailable() throws IOException { + + // write FILE_SIZE bytes to page blob + FSDataOutputStream out = fs.create(PATH); + byte[] data = new byte[FILE_SIZE]; + Arrays.fill(data, (byte) 5); + out.write(data, 0, FILE_SIZE); + out.close(); + + // Test available() for different read sizes + verifyAvailable(1); + verifyAvailable(100); + verifyAvailable(5000); + verifyAvailable(FILE_SIZE); + verifyAvailable(MAX_STRIDE); + + fs.delete(PATH, false); + } + + // Verify that available() for the input stream is always >= 1 unless we've + // consumed all the input, and then it is 0. This is to match expectations by + // HBase which were set based on behavior of DFSInputStream.available(). + private void verifyAvailable(int readStride) throws IOException { + FSDataInputStream in = fs.open(PATH); + try { + byte[] inputBuffer = new byte[MAX_STRIDE]; + int position = 0; + int bytesRead = 0; + while(bytesRead != FILE_SIZE) { + bytesRead += in.read(inputBuffer, position, readStride); + int available = in.available(); + if (bytesRead < FILE_SIZE) { + if (available < 1) { + fail(String.format( + "expected available > 0 but got: " + + "position = %d, bytesRead = %d, in.available() = %d", + position, bytesRead, available)); + } + } + } + int available = in.available(); + assertTrue(available == 0); + } finally { + in.close(); + } + } + + @Test + public void testGetFileSizeFromListing() throws IOException { + Path path = new Path("file.dat"); + final int PAGE_SIZE = 512; + final int FILE_SIZE = PAGE_SIZE + 1; + + // write FILE_SIZE bytes to page blob + FSDataOutputStream out = fs.create(path); + byte[] data = new byte[FILE_SIZE]; + Arrays.fill(data, (byte) 5); + out.write(data, 0, FILE_SIZE); + out.close(); + + // list the file to get its properties + FileStatus[] status = fs.listStatus(path); + assertEquals(1, status.length); + + // The file length should report the number of bytes + // written for either page or block blobs (subclasses + // of this test class will exercise both). + assertEquals(FILE_SIZE, status[0].getLen()); + } + private boolean testModifiedTime(Path testPath, long time) throws Exception { FileStatus fileStatus = fs.getFileStatus(testPath); final long errorMargin = modifiedTimeErrorMargin; @@ -530,16 +1257,45 @@ public abstract class NativeAzureFileSystemBaseTest { return (lastModified > (time - errorMargin) && lastModified < (time + errorMargin)); } + @SuppressWarnings("deprecation") + @Test + public void testCreateNonRecursive() throws Exception { + Path testFolder = new Path("/testFolder"); + Path testFile = new Path(testFolder, "testFile"); + try { + fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null); + assertTrue("Should've thrown", false); + } catch (FileNotFoundException e) { + } + fs.mkdirs(testFolder); + fs.createNonRecursive(testFile, true, 1024, (short)1, 1024, null) + .close(); + assertTrue(fs.exists(testFile)); + } + + public void testFileEndingInDot() throws Exception { + Path testFolder = new Path("/testFolder."); + Path testFile = new Path(testFolder, "testFile."); + assertTrue(fs.mkdirs(testFolder)); + assertTrue(fs.createNewFile(testFile)); + assertTrue(fs.exists(testFile)); + FileStatus[] listed = fs.listStatus(testFolder); + assertEquals(1, listed.length); + assertEquals("testFile.", listed[0].getPath().getName()); + } private void testModifiedTime(Path testPath) throws Exception { Calendar utc = Calendar.getInstance(TimeZone.getTimeZone("UTC")); long currentUtcTime = utc.getTime().getTime(); FileStatus fileStatus = fs.getFileStatus(testPath); - assertTrue("Modification time " - + new Date(fileStatus.getModificationTime()) + " is not close to now: " - + utc.getTime(), testModifiedTime(testPath, currentUtcTime)); + final long errorMargin = 10 * 1000; // Give it +/-10 seconds + assertTrue("Modification time " + + new Date(fileStatus.getModificationTime()) + " is not close to now: " + + utc.getTime(), + fileStatus.getModificationTime() > (currentUtcTime - errorMargin) && + fileStatus.getModificationTime() < (currentUtcTime + errorMargin)); } - private void createEmptyFile(Path testFile, FsPermission permission) + private void createEmptyFile(Path testFile, FsPermission permission) throws IOException { FSDataOutputStream outputStream = fs.create(testFile, permission, true, 4096, (short) 1, 1024, null); @@ -563,7 +1319,7 @@ public abstract class NativeAzureFileSystemBaseTest { final int BUFFER_SIZE = 1024; char[] buffer = new char[BUFFER_SIZE]; int count = reader.read(buffer, 0, BUFFER_SIZE); - if (count >= BUFFER_SIZE) { + if (count > BUFFER_SIZE) { throw new IOException("Exceeded buffer size"); } inputStream.close(); @@ -578,7 +1334,6 @@ public abstract class NativeAzureFileSystemBaseTest { throws IOException { FSDataOutputStream outputStream = fs.create(path, true); writeString(outputStream, value); - outputStream.close(); } private void writeString(FSDataOutputStream outputStream, String value) @@ -588,4 +1343,175 @@ public abstract class NativeAzureFileSystemBaseTest { writer.write(value); writer.close(); } + + @Test + // Acquire and free a Lease object. Wait for more than the lease + // timeout, to make sure the lease renews itself. + public void testSelfRenewingLease() throws IllegalArgumentException, IOException, + InterruptedException, StorageException { + + SelfRenewingLease lease; + final String FILE_KEY = "file"; + fs.create(new Path(FILE_KEY)); + NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs; + String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(FILE_KEY))); + AzureNativeFileSystemStore store = nfs.getStore(); + lease = store.acquireLease(fullKey); + assertTrue(lease.getLeaseID() != null); + + // The sleep time for the keep-alive thread is 40 seconds, so sleep just + // a little beyond that, to make sure the keep-alive thread wakes up + // and renews the lease. + Thread.sleep(42000); + lease.free(); + + // Check that the lease is really freed. + CloudBlob blob = lease.getCloudBlob(); + + // Try to acquire it again, using direct Azure blob access. + // If that succeeds, then the lease was already freed. + String differentLeaseID = null; + try { + differentLeaseID = blob.acquireLease(15, null); + } catch (Exception e) { + e.printStackTrace(); + fail("Caught exception trying to directly re-acquire lease from Azure"); + } finally { + assertTrue(differentLeaseID != null); + AccessCondition accessCondition = AccessCondition.generateEmptyCondition(); + accessCondition.setLeaseID(differentLeaseID); + blob.releaseLease(accessCondition); + } + } + + @Test + // Acquire a SelfRenewingLease object. Wait for more than the lease + // timeout, to make sure the lease renews itself. Delete the file. + // That will automatically free the lease. + // (that should work without any failures). + public void testSelfRenewingLeaseFileDelete() + throws IllegalArgumentException, IOException, + InterruptedException, StorageException { + + SelfRenewingLease lease; + final String FILE_KEY = "file"; + final Path path = new Path(FILE_KEY); + fs.create(path); + NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs; + String fullKey = nfs.pathToKey(nfs.makeAbsolute(path)); + lease = nfs.getStore().acquireLease(fullKey); + assertTrue(lease.getLeaseID() != null); + + // The sleep time for the keep-alive thread is 40 seconds, so sleep just + // a little beyond that, to make sure the keep-alive thread wakes up + // and renews the lease. + Thread.sleep(42000); + + nfs.getStore().delete(fullKey, lease); + + // Check that the file is really gone and the lease is freed. + assertTrue(!fs.exists(path)); + assertTrue(lease.isFreed()); + } + + // Variables to check assertions in next test. + private long firstEndTime; + private long secondStartTime; + + // Create two threads. One will get a lease on a file. + // The second one will try to get the lease and thus block. + // Then the first one will free the lease and the second + // one will get it and proceed. + @Test + public void testLeaseAsDistributedLock() throws IllegalArgumentException, + IOException { + final String LEASE_LOCK_FILE_KEY = "file"; + fs.create(new Path(LEASE_LOCK_FILE_KEY)); + NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs; + String fullKey = nfs.pathToKey(nfs.makeAbsolute(new Path(LEASE_LOCK_FILE_KEY))); + + Thread first = new Thread(new LeaseLockAction("first-thread", fullKey)); + first.start(); + Thread second = new Thread(new LeaseLockAction("second-thread", fullKey)); + second.start(); + try { + + // Wait for the two threads to finish. + first.join(); + second.join(); + assertTrue(firstEndTime < secondStartTime); + } catch (InterruptedException e) { + fail("Unable to wait for threads to finish"); + Thread.currentThread().interrupt(); + } + } + + private class LeaseLockAction implements Runnable { + private String name; + private String key; + + LeaseLockAction(String name, String key) { + this.name = name; + this.key = key; + } + + @Override + public void run() { + LOG.info("starting thread " + name); + SelfRenewingLease lease = null; + NativeAzureFileSystem nfs = (NativeAzureFileSystem) fs; + + if (name.equals("first-thread")) { + try { + lease = nfs.getStore().acquireLease(key); + LOG.info(name + " acquired lease " + lease.getLeaseID()); + } catch (AzureException e) { + assertTrue("Unanticipated exception", false); + } + assertTrue(lease != null); + try { + + // Sleep long enough for the lease to renew once. + Thread.sleep(SelfRenewingLease.LEASE_RENEWAL_PERIOD + 2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + try { + firstEndTime = System.currentTimeMillis(); + lease.free(); + LOG.info(name + " freed lease " + lease.getLeaseID()); + } catch (StorageException e) { + fail("Unanticipated exception"); + } + } else if (name.equals("second-thread")) { + try { + + // sleep 2 sec to let first thread get ahead of this one + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + try { + LOG.info(name + " before getting lease"); + lease = nfs.getStore().acquireLease(key); + secondStartTime = System.currentTimeMillis(); + LOG.info(name + " acquired lease " + lease.getLeaseID()); + } catch (AzureException e) { + assertTrue("Unanticipated exception", false); + } + assertTrue(lease != null); + try { + lease.free(); + LOG.info(name + " freed lease " + lease.getLeaseID()); + } catch (StorageException e) { + assertTrue("Unanticipated exception", false); + } + } else { + assertTrue("Unknown thread name", false); + } + + LOG.info(name + " is exiting."); + } + + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt new file mode 100644 index 0000000..54ba4d8 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/RunningLiveWasbTests.txt @@ -0,0 +1,22 @@ +======================================================================== +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +========================================================================= + +In order to run Windows Azure Storage Blob (WASB) unit tests against a live +Azure Storage account, you need to provide test account details in a configuration +file called azure-test.xml. See hadoop-tools/hadoop-azure/README.txt for details +on configuration, and how to run the tests. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java index c10ac0f..0894cf5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureConcurrentOutOfBandIo.java @@ -22,11 +22,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import static org.junit.Assume.assumeNotNull; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; +import java.io.*; import java.util.Arrays; +import org.apache.hadoop.fs.azure.AzureException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.junit.After; @@ -100,13 +99,14 @@ public class TestAzureConcurrentOutOfBandIo { public void run() { byte[] dataBlockWrite = new byte[UPLOAD_BLOCK_SIZE]; - DataOutputStream outputStream = null; + OutputStream outputStream = null; try { for (int i = 0; !done; i++) { // Write two 4 MB blocks to the blob. // - outputStream = writerStorageAccount.getStore().storefile(key, + outputStream = writerStorageAccount.getStore().storefile( + key, new PermissionStatus("", "", FsPermission.getDefault())); Arrays.fill(dataBlockWrite, (byte) (i % 256)); @@ -124,7 +124,7 @@ public class TestAzureConcurrentOutOfBandIo { } catch (IOException e) { System.out .println("DatablockWriter thread encountered an I/O exception." - + e.getMessage()); + + e.getMessage()); } } } @@ -137,30 +137,29 @@ public class TestAzureConcurrentOutOfBandIo { // Write to blob to make sure it exists. // - // Write five 4 MB blocks to the blob. To ensure there is data in the blob - // before reading. This eliminates the race between the reader and writer - // threads. - DataOutputStream outputStream = testAccount.getStore().storefile( - "WASB_String.txt", - new PermissionStatus("", "", FsPermission.getDefault())); - Arrays.fill(dataBlockWrite, (byte) 255); - for (int i = 0; i < NUMBER_OF_BLOCKS; i++) { - outputStream.write(dataBlockWrite); - } - - outputStream.flush(); - outputStream.close(); - - // Start writing blocks to Azure store using the DataBlockWriter thread. + // Write five 4 MB blocks to the blob. To ensure there is data in the blob before + // reading. This eliminates the race between the reader and writer threads. + OutputStream outputStream = testAccount.getStore().storefile( + "WASB_String.txt", + new PermissionStatus("", "", FsPermission.getDefault())); + Arrays.fill(dataBlockWrite, (byte) 255); + for (int i = 0; i < NUMBER_OF_BLOCKS; i++) { + outputStream.write(dataBlockWrite); + } + + outputStream.flush(); + outputStream.close(); + + // Start writing blocks to Azure store using the DataBlockWriter thread. DataBlockWriter writeBlockTask = new DataBlockWriter(testAccount, "WASB_String.txt"); - writeBlockTask.startWriting(); - int count = 0; - DataInputStream inputStream = null; + writeBlockTask.startWriting(); + int count = 0; + DataInputStream inputStream = null; - for (int i = 0; i < 5; i++) { - try { - inputStream = testAccount.getStore().retrieve("WASB_String.txt", 0); + for (int i = 0; i < 5; i++) { + try { + inputStream = testAccount.getStore().retrieve("WASB_String.txt"); count = 0; int c = 0; @@ -173,17 +172,17 @@ public class TestAzureConcurrentOutOfBandIo { // Counting the number of bytes. count += c; } - } catch (IOException e) { - System.out.println(e.getCause().toString()); - e.printStackTrace(); - fail(); - } - - // Close the stream. - if (null != inputStream) { - inputStream.close(); - } - } + } catch (IOException e) { + System.out.println(e.getCause().toString()); + e.printStackTrace(); + fail(); + } + + // Close the stream. + if (null != inputStream){ + inputStream.close(); + } + } // Stop writing blocks. writeBlockTask.stopWriting(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java index 6e89822..febb605 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java @@ -32,7 +32,6 @@ import java.util.Arrays; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; @@ -65,19 +64,18 @@ public class TestAzureFileSystemErrorConditions { */ @Test public void testAccessUnauthorizedPublicContainer() throws Exception { - Configuration conf = new Configuration(); - AzureBlobStorageTestAccount.addWasbToConfiguration(conf); Path noAccessPath = new Path( "wasb://nonExistentContainer@hopefullyNonExistentAccount/someFile"); NativeAzureFileSystem.suppressRetryPolicy(); try { - FileSystem.get(noAccessPath.toUri(), conf).open(noAccessPath); + FileSystem.get(noAccessPath.toUri(), new Configuration()) + .open(noAccessPath); assertTrue("Should've thrown.", false); } catch (AzureException ex) { - assertTrue("Unexpected message in exception " + ex, ex.getMessage() - .contains( - "Unable to access container nonExistentContainer in account" - + " hopefullyNonExistentAccount")); + assertTrue("Unexpected message in exception " + ex, + ex.getMessage().contains( + "Unable to access container nonExistentContainer in account" + + " hopefullyNonExistentAccount")); } finally { NativeAzureFileSystem.resumeRetryPolicy(); } @@ -104,11 +102,11 @@ public class TestAzureFileSystemErrorConditions { fs.listStatus(new Path("/")); passed = true; } catch (AzureException ex) { - assertTrue("Unexpected exception message: " + ex, ex.getMessage() - .contains("unsupported version: 2090-04-05.")); + assertTrue("Unexpected exception message: " + ex, + ex.getMessage().contains("unsupported version: 2090-04-05.")); } - assertFalse( - "Should've thrown an exception because of the wrong version.", passed); + assertFalse("Should've thrown an exception because of the wrong version.", + passed); } finally { fs.close(); } @@ -118,8 +116,7 @@ public class TestAzureFileSystemErrorConditions { boolean isTargetConnection(HttpURLConnection connection); } - private class TransientErrorInjector extends - StorageEvent<SendingRequestEvent> { + private class TransientErrorInjector extends StorageEvent<SendingRequestEvent> { final ConnectionRecognizer connectionRecognizer; private boolean injectedErrorOnce = false; @@ -129,8 +126,7 @@ public class TestAzureFileSystemErrorConditions { @Override public void eventOccurred(SendingRequestEvent eventArg) { - HttpURLConnection connection = (HttpURLConnection) eventArg - .getConnectionObject(); + HttpURLConnection connection = (HttpURLConnection)eventArg.getConnectionObject(); if (!connectionRecognizer.isTargetConnection(connection)) { return; } @@ -157,8 +153,8 @@ public class TestAzureFileSystemErrorConditions { @Test public void testTransientErrorOnDelete() throws Exception { // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount - .create(); + AzureBlobStorageTestAccount testAccount = + AzureBlobStorageTestAccount.create(); assumeNotNull(testAccount); try { NativeAzureFileSystem fs = testAccount.getFileSystem(); @@ -179,7 +175,7 @@ public class TestAzureFileSystemErrorConditions { private void writeAllThreeFile(NativeAzureFileSystem fs, Path testFile) throws IOException { byte[] buffer = new byte[ALL_THREE_FILE_SIZE]; - Arrays.fill(buffer, (byte) 3); + Arrays.fill(buffer, (byte)3); OutputStream stream = fs.create(testFile); stream.write(buffer); stream.close(); @@ -189,7 +185,8 @@ public class TestAzureFileSystemErrorConditions { throws IOException { byte[] buffer = new byte[ALL_THREE_FILE_SIZE]; InputStream inStream = fs.open(testFile); - assertEquals(buffer.length, inStream.read(buffer, 0, buffer.length)); + assertEquals(buffer.length, + inStream.read(buffer, 0, buffer.length)); inStream.close(); for (int i = 0; i < buffer.length; i++) { assertEquals(3, buffer[i]); @@ -199,8 +196,8 @@ public class TestAzureFileSystemErrorConditions { @Test public void testTransientErrorOnCommitBlockList() throws Exception { // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount - .create(); + AzureBlobStorageTestAccount testAccount = + AzureBlobStorageTestAccount.create(); assumeNotNull(testAccount); try { NativeAzureFileSystem fs = testAccount.getFileSystem(); @@ -222,8 +219,8 @@ public class TestAzureFileSystemErrorConditions { @Test public void testTransientErrorOnRead() throws Exception { // Need to do this test against a live storage account - AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount - .create(); + AzureBlobStorageTestAccount testAccount = + AzureBlobStorageTestAccount.create(); assumeNotNull(testAccount); try { NativeAzureFileSystem fs = testAccount.getFileSystem(); @@ -240,16 +237,4 @@ public class TestAzureFileSystemErrorConditions { testAccount.cleanup(); } } - - // Tests an error during stream creation (in this case in the seek() implementation - // to verify the close-stream-on-error logic. - @Test (expected=AzureException.class) - public void testErrorDuringRetrieve() throws Exception { - NativeAzureFileSystem fs = AzureBlobStorageTestAccount.createMock().getFileSystem(); - Path testFile = new Path("/testErrorDuringRetrieve"); - writeAllThreeFile(fs, testFile); - - FSDataInputStream stream = fs.open(testFile); - stream.seek(Integer.MAX_VALUE); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java index b585c56..25bd338 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobDataValidation.java @@ -128,7 +128,7 @@ public class TestBlobDataValidation { if (!expectMd5Stored) { throw ex; } - StorageException cause = (StorageException) ex.getCause(); + StorageException cause = (StorageException)ex.getCause(); assertNotNull(cause); assertTrue("Unexpected cause: " + cause, cause.getErrorCode().equals(StorageErrorCodeStrings.INVALID_MD5)); @@ -212,13 +212,13 @@ public class TestBlobDataValidation { // validate the data as expected, but the HttpURLConnection wasn't // pluggable enough for me to do that. testAccount.getFileSystem().getStore() - .addTestHookToOperationContext(new TestHookOperationContext() { - @Override + .addTestHookToOperationContext(new TestHookOperationContext() { + @Override public OperationContext modifyOperationContext( OperationContext original) { - original.getResponseReceivedEventHandler().addListener( - new ContentMD5Checker(expectMd5Checked)); - return original; + original.getResponseReceivedEventHandler().addListener( + new ContentMD5Checker(expectMd5Checked)); + return original; } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java index b75fc38..6c49926 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobMetadata.java @@ -69,7 +69,8 @@ public class TestBlobMetadata { throws Exception { return String.format( "{\"owner\":\"%s\",\"group\":\"%s\",\"permissions\":\"%s\"}", - getExpectedOwner(), NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT, + getExpectedOwner(), + NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT, permissionString); } @@ -80,8 +81,8 @@ public class TestBlobMetadata { public void testContainerVersionMetadata() throws Exception { // Do a write operation to trigger version stamp fs.createNewFile(new Path("/foo")); - HashMap<String, String> containerMetadata = backingStore - .getContainerMetadata(); + HashMap<String, String> containerMetadata = + backingStore.getContainerMetadata(); assertNotNull(containerMetadata); assertEquals(AzureNativeFileSystemStore.CURRENT_WASB_VERSION, containerMetadata.get(AzureNativeFileSystemStore.VERSION_METADATA_KEY)); @@ -226,26 +227,32 @@ public class TestBlobMetadata { @Test public void testOldPermissionMetadata() throws Exception { Path selfishFile = new Path("/noOneElse"); - HashMap<String, String> metadata = new HashMap<String, String>(); - metadata.put("asv_permission", getExpectedPermissionString("rw-------")); - backingStore.setContent(AzureBlobStorageTestAccount.toMockUri(selfishFile), - new byte[] {}, metadata); - FsPermission justMe = new FsPermission(FsAction.READ_WRITE, FsAction.NONE, - FsAction.NONE); + HashMap<String, String> metadata = + new HashMap<String, String>(); + metadata.put("asv_permission", + getExpectedPermissionString("rw-------")); + backingStore.setContent( + AzureBlobStorageTestAccount.toMockUri(selfishFile), + new byte[] { }, + metadata, false, 0); + FsPermission justMe = new FsPermission( + FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE); FileStatus retrievedStatus = fs.getFileStatus(selfishFile); assertNotNull(retrievedStatus); assertEquals(justMe, retrievedStatus.getPermission()); assertEquals(getExpectedOwner(), retrievedStatus.getOwner()); assertEquals(NativeAzureFileSystem.AZURE_DEFAULT_GROUP_DEFAULT, retrievedStatus.getGroup()); - FsPermission meAndYou = new FsPermission(FsAction.READ_WRITE, - FsAction.READ_WRITE, FsAction.NONE); + FsPermission meAndYou = new FsPermission( + FsAction.READ_WRITE, FsAction.READ_WRITE, FsAction.NONE); fs.setPermission(selfishFile, meAndYou); - metadata = backingStore.getMetadata(AzureBlobStorageTestAccount - .toMockUri(selfishFile)); + metadata = + backingStore.getMetadata( + AzureBlobStorageTestAccount.toMockUri(selfishFile)); assertNotNull(metadata); String storedPermission = metadata.get("hdi_permission"); - assertEquals(getExpectedPermissionString("rw-rw----"), storedPermission); + assertEquals(getExpectedPermissionString("rw-rw----"), + storedPermission); assertNull(metadata.get("asv_permission")); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java new file mode 100644 index 0000000..afb16ef --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobTypeSpeedDifference.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.*; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; + +import junit.framework.*; + +import org.junit.Test; + + +/** + * A simple benchmark to find out the difference in speed between block + * and page blobs. + */ +public class TestBlobTypeSpeedDifference extends TestCase { + /** + * Writes data to the given stream of the given size, flushing every + * x bytes. + */ + private static void writeTestFile(OutputStream writeStream, + long size, long flushInterval) throws IOException { + int bufferSize = (int) Math.min(1000, flushInterval); + byte[] buffer = new byte[bufferSize]; + Arrays.fill(buffer, (byte) 7); + int bytesWritten = 0; + int bytesUnflushed = 0; + while (bytesWritten < size) { + int numberToWrite = (int) Math.min(bufferSize, size - bytesWritten); + writeStream.write(buffer, 0, numberToWrite); + bytesWritten += numberToWrite; + bytesUnflushed += numberToWrite; + if (bytesUnflushed >= flushInterval) { + writeStream.flush(); + bytesUnflushed = 0; + } + } + } + + private static class TestResult { + final long timeTakenInMs; + final long totalNumberOfRequests; + + TestResult(long timeTakenInMs, long totalNumberOfRequests) { + this.timeTakenInMs = timeTakenInMs; + this.totalNumberOfRequests = totalNumberOfRequests; + } + } + + /** + * Writes data to the given file of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writeTestFile(NativeAzureFileSystem fs, Path path, + long size, long flushInterval) throws IOException { + AzureFileSystemInstrumentation instrumentation = + fs.getInstrumentation(); + long initialRequests = instrumentation.getCurrentWebResponses(); + Date start = new Date(); + OutputStream output = fs.create(path); + writeTestFile(output, size, flushInterval); + output.close(); + long finalRequests = instrumentation.getCurrentWebResponses(); + return new TestResult(new Date().getTime() - start.getTime(), + finalRequests - initialRequests); + } + + /** + * Writes data to a block blob of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writeBlockBlobTestFile(NativeAzureFileSystem fs, + long size, long flushInterval) throws IOException { + return writeTestFile(fs, new Path("/blockBlob"), size, flushInterval); + } + + /** + * Writes data to a page blob of the given size, flushing every + * x bytes. Measure performance of that and return it. + */ + private static TestResult writePageBlobTestFile(NativeAzureFileSystem fs, + long size, long flushInterval) throws IOException { + return writeTestFile(fs, + AzureBlobStorageTestAccount.pageBlobPath("pageBlob"), + size, flushInterval); + } + + /** + * Runs the benchmark over a small 10 KB file, flushing every 500 bytes. + */ + @Test + public void testTenKbFileFrequentFlush() throws Exception { + AzureBlobStorageTestAccount testAccount = + AzureBlobStorageTestAccount.create(); + if (testAccount == null) { + return; + } + try { + testForSizeAndFlushInterval(testAccount.getFileSystem(), 10 * 1000, 500); + } finally { + testAccount.cleanup(); + } + } + + /** + * Runs the benchmark for the given file size and flush frequency. + */ + private static void testForSizeAndFlushInterval(NativeAzureFileSystem fs, + final long size, final long flushInterval) throws IOException { + for (int i = 0; i < 5; i++) { + TestResult pageBlobResults = writePageBlobTestFile(fs, size, flushInterval); + System.out.printf( + "Page blob upload took %d ms. Total number of requests: %d.\n", + pageBlobResults.timeTakenInMs, pageBlobResults.totalNumberOfRequests); + TestResult blockBlobResults = writeBlockBlobTestFile(fs, size, flushInterval); + System.out.printf( + "Block blob upload took %d ms. Total number of requests: %d.\n", + blockBlobResults.timeTakenInMs, blockBlobResults.totalNumberOfRequests); + } + } + + /** + * Runs the benchmark for the given file size and flush frequency from the + * command line. + */ + public static void main(String argv[]) throws Exception { + Configuration conf = new Configuration(); + long size = 10 * 1000 * 1000; + long flushInterval = 2000; + if (argv.length > 0) { + size = Long.parseLong(argv[0]); + } + if (argv.length > 1) { + flushInterval = Long.parseLong(argv[1]); + } + testForSizeAndFlushInterval((NativeAzureFileSystem)FileSystem.get(conf), + size, flushInterval); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a737026/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java new file mode 100644 index 0000000..208cff3 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestNativeAzureFSPageBlobLive.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.conf.Configuration; + +/** + * Run the base Azure file system tests strictly on page blobs to make sure fundamental + * operations on page blob files and folders work as expected. + * These operations include create, delete, rename, list, and so on. + */ +public class TestNativeAzureFSPageBlobLive extends + NativeAzureFileSystemBaseTest { + + @Override + protected AzureBlobStorageTestAccount createTestAccount() + throws Exception { + Configuration conf = new Configuration(); + + // Configure the page blob directories key so every file created is a page blob. + conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/"); + + // Configure the atomic rename directories key so every folder will have + // atomic rename applied. + conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/"); + return AzureBlobStorageTestAccount.create(conf); + } +}
