http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java new file mode 100644 index 0000000..f969968 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeAzureFileSystemLive.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; + +import org.junit.Test; + +import com.microsoft.azure.storage.StorageException; + +/** + * Tests the Native Azure file system (WASB) against an actual blob store. + */ +public class ITestNativeAzureFileSystemLive extends + NativeAzureFileSystemBaseTest { + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + @Test + public void testLazyRenamePendingCanOverwriteExistingFile() + throws Exception { + final String srcFile = "srcFile"; + final String dstFile = "dstFile"; + Path srcPath = path(srcFile); + FSDataOutputStream srcStream = fs.create(srcPath); + assertTrue(fs.exists(srcPath)); + Path dstPath = path(dstFile); + FSDataOutputStream dstStream = fs.create(dstPath); + assertTrue(fs.exists(dstPath)); + NativeAzureFileSystem nfs = fs; + final String fullSrcKey = nfs.pathToKey(nfs.makeAbsolute(srcPath)); + final String fullDstKey = nfs.pathToKey(nfs.makeAbsolute(dstPath)); + nfs.getStoreInterface().rename(fullSrcKey, fullDstKey, true, null); + assertTrue(fs.exists(dstPath)); + assertFalse(fs.exists(srcPath)); + IOUtils.cleanupWithLogger(null, srcStream); + IOUtils.cleanupWithLogger(null, dstStream); + } + /** + * Tests fs.delete() function to delete a blob when another blob is holding a + * lease on it. Delete if called without a lease should fail if another process + * is holding a lease and throw appropriate exception + * This is a scenario that would happen in HMaster startup when it tries to + * clean up the temp dirs while the HMaster process which was killed earlier + * held lease on the blob when doing some DDL operation + */ + @Test + public void testDeleteThrowsExceptionWithLeaseExistsErrorMessage() + throws Exception { + LOG.info("Starting test"); + // Create the file + Path path = methodPath(); + fs.create(path); + assertPathExists("test file", path); + NativeAzureFileSystem nfs = fs; + final String fullKey = nfs.pathToKey(nfs.makeAbsolute(path)); + final AzureNativeFileSystemStore store = nfs.getStore(); + + // Acquire the lease on the file in a background thread + final CountDownLatch leaseAttemptComplete = new CountDownLatch(1); + final CountDownLatch beginningDeleteAttempt = new CountDownLatch(1); + Thread t = new Thread() { + @Override + public void run() { + // Acquire the lease and then signal the main test thread. + SelfRenewingLease lease = null; + try { + lease = store.acquireLease(fullKey); + LOG.info("Lease acquired: " + lease.getLeaseID()); + } catch (AzureException e) { + LOG.warn("Lease acqusition thread unable to acquire lease", e); + } finally { + leaseAttemptComplete.countDown(); + } + + // Wait for the main test thread to signal it will attempt the delete. + try { + beginningDeleteAttempt.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Keep holding the lease past the lease acquisition retry interval, so + // the test covers the case of delete retrying to acquire the lease. + try { + Thread.sleep(SelfRenewingLease.LEASE_ACQUIRE_RETRY_INTERVAL * 3); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + try { + if (lease != null){ + LOG.info("Freeing lease"); + lease.free(); + } + } catch (StorageException se) { + LOG.warn("Unable to free lease.", se); + } + } + }; + + // Start the background thread and wait for it to signal the lease is held. + t.start(); + try { + leaseAttemptComplete.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + // Try to delete the same file + beginningDeleteAttempt.countDown(); + store.delete(fullKey); + + // At this point file SHOULD BE DELETED + assertPathDoesNotExist("Leased path", path); + } + + /** + * Check that isPageBlobKey works as expected. This assumes that + * in the test configuration, the list of supported page blob directories + * only includes "pageBlobs". That's why this test is made specific + * to this subclass. + */ + @Test + public void testIsPageBlobKey() { + AzureNativeFileSystemStore store = fs.getStore(); + + // Use literal strings so it's easier to understand the tests. + // In case the constant changes, we want to know about it so we can update this test. + assertEquals(AzureBlobStorageTestAccount.DEFAULT_PAGE_BLOB_DIRECTORY, "pageBlobs"); + + // URI prefix for test environment. + String uriPrefix = "file:///"; + + // negative tests + String[] negativeKeys = { "", "/", "bar", "bar/", "bar/pageBlobs", "bar/pageBlobs/foo", + "bar/pageBlobs/foo/", "/pageBlobs/", "/pageBlobs", "pageBlobs", "pageBlobsxyz/" }; + for (String s : negativeKeys) { + assertFalse(store.isPageBlobKey(s)); + assertFalse(store.isPageBlobKey(uriPrefix + s)); + } + + // positive tests + String[] positiveKeys = { "pageBlobs/", "pageBlobs/foo/", "pageBlobs/foo/bar/" }; + for (String s : positiveKeys) { + assertTrue(store.isPageBlobKey(s)); + assertTrue(store.isPageBlobKey(uriPrefix + s)); + } + } + + /** + * Test that isAtomicRenameKey() works as expected. + */ + @Test + public void testIsAtomicRenameKey() { + + AzureNativeFileSystemStore store = fs.getStore(); + + // We want to know if the default configuration changes so we can fix + // this test. + assertEquals(AzureBlobStorageTestAccount.DEFAULT_ATOMIC_RENAME_DIRECTORIES, + "/atomicRenameDir1,/atomicRenameDir2"); + + // URI prefix for test environment. + String uriPrefix = "file:///"; + + // negative tests + String[] negativeKeys = { "", "/", "bar", "bar/", "bar/hbase", + "bar/hbase/foo", "bar/hbase/foo/", "/hbase/", "/hbase", "hbase", + "hbasexyz/", "foo/atomicRenameDir1/"}; + for (String s : negativeKeys) { + assertFalse(store.isAtomicRenameKey(s)); + assertFalse(store.isAtomicRenameKey(uriPrefix + s)); + } + + // Positive tests. The directories for atomic rename are /hbase + // plus the ones in the configuration (DEFAULT_ATOMIC_RENAME_DIRECTORIES + // for this test). + String[] positiveKeys = { "hbase/", "hbase/foo/", "hbase/foo/bar/", + "atomicRenameDir1/foo/", "atomicRenameDir2/bar/"}; + for (String s : positiveKeys) { + assertTrue(store.isAtomicRenameKey(s)); + assertTrue(store.isAtomicRenameKey(uriPrefix + s)); + } + } + + /** + * Tests fs.mkdir() function to create a target blob while another thread + * is holding the lease on the blob. mkdir should not fail since the blob + * already exists. + * This is a scenario that would happen in HBase distributed log splitting. + * Multiple threads will try to create and update "recovered.edits" folder + * under the same path. + */ + @Test + public void testMkdirOnExistingFolderWithLease() throws Exception { + SelfRenewingLease lease; + // Create the folder + Path path = methodPath(); + fs.mkdirs(path); + NativeAzureFileSystem nfs = fs; + String fullKey = nfs.pathToKey(nfs.makeAbsolute(path)); + AzureNativeFileSystemStore store = nfs.getStore(); + // Acquire the lease on the folder + lease = store.acquireLease(fullKey); + assertNotNull("lease ID", lease.getLeaseID() != null); + // Try to create the same folder + store.storeEmptyFolder(fullKey, + nfs.createPermissionStatus(FsPermission.getDirDefault())); + lease.free(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java new file mode 100644 index 0000000..b63aaf0 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestOutOfBandAzureBlobOperationsLive.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; + +import com.microsoft.azure.storage.blob.BlobOutputStream; +import com.microsoft.azure.storage.blob.CloudBlockBlob; + +/** + * Live blob operations. + */ +public class ITestOutOfBandAzureBlobOperationsLive extends AbstractWasbTestBase { + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + // scenario for this particular test described at MONARCH-HADOOP-764 + // creating a file out-of-band would confuse mkdirs("<oobfilesUncleFolder>") + // eg oob creation of "user/<name>/testFolder/a/input/file" + // Then wasb creation of "user/<name>/testFolder/a/output" fails + @Test + public void outOfBandFolder_uncleMkdirs() throws Exception { + + // NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + String workingDir = "user/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/"; + + CloudBlockBlob blob = testAccount.getBlobReference(workingDir + + "testFolder1/a/input/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + assertTrue(fs.exists(new Path("testFolder1/a/input/file"))); + + Path targetFolder = new Path("testFolder1/a/output"); + assertTrue(fs.mkdirs(targetFolder)); + } + + // scenario for this particular test described at MONARCH-HADOOP-764 + @Test + public void outOfBandFolder_parentDelete() throws Exception { + + // NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + String workingDir = "user/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/"; + CloudBlockBlob blob = testAccount.getBlobReference(workingDir + + "testFolder2/a/input/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + assertTrue(fs.exists(new Path("testFolder2/a/input/file"))); + + Path targetFolder = new Path("testFolder2/a/input"); + assertTrue(fs.delete(targetFolder, true)); + } + + @Test + public void outOfBandFolder_rootFileDelete() throws Exception { + + CloudBlockBlob blob = testAccount.getBlobReference("fileY"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + assertTrue(fs.exists(new Path("/fileY"))); + assertTrue(fs.delete(new Path("/fileY"), true)); + } + + @Test + public void outOfBandFolder_firstLevelFolderDelete() throws Exception { + + CloudBlockBlob blob = testAccount.getBlobReference("folderW/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + assertTrue(fs.exists(new Path("/folderW"))); + assertTrue(fs.exists(new Path("/folderW/file"))); + assertTrue(fs.delete(new Path("/folderW"), true)); + } + + // scenario for this particular test described at MONARCH-HADOOP-764 + @Test + public void outOfBandFolder_siblingCreate() throws Exception { + + // NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + String workingDir = "user/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/"; + CloudBlockBlob blob = testAccount.getBlobReference(workingDir + + "testFolder3/a/input/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + assertTrue(fs.exists(new Path("testFolder3/a/input/file"))); + + Path targetFile = new Path("testFolder3/a/input/file2"); + FSDataOutputStream s2 = fs.create(targetFile); + s2.close(); + } + + // scenario for this particular test described at MONARCH-HADOOP-764 + // creating a new file in the root folder + @Test + public void outOfBandFolder_create_rootDir() throws Exception { + Path targetFile = new Path("/newInRoot"); + FSDataOutputStream s2 = fs.create(targetFile); + s2.close(); + } + + // scenario for this particular test described at MONARCH-HADOOP-764 + @Test + public void outOfBandFolder_rename() throws Exception { + + // NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + String workingDir = "user/" + + UserGroupInformation.getCurrentUser().getShortUserName() + "/"; + CloudBlockBlob blob = testAccount.getBlobReference(workingDir + + "testFolder4/a/input/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + + Path srcFilePath = new Path("testFolder4/a/input/file"); + assertTrue(fs.exists(srcFilePath)); + + Path destFilePath = new Path("testFolder4/a/input/file2"); + fs.rename(srcFilePath, destFilePath); + } + + // Verify that you can rename a file which is the only file in an implicit folder in the + // WASB file system. + // scenario for this particular test described at MONARCH-HADOOP-892 + @Test + public void outOfBandSingleFile_rename() throws Exception { + + //NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + String workingDir = "user/" + UserGroupInformation.getCurrentUser().getShortUserName() + "/"; + CloudBlockBlob blob = testAccount.getBlobReference(workingDir + "testFolder5/a/input/file"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + + Path srcFilePath = new Path("testFolder5/a/input/file"); + assertTrue(fs.exists(srcFilePath)); + + Path destFilePath = new Path("testFolder5/file2"); + fs.rename(srcFilePath, destFilePath); + } + + // WASB must force explicit parent directories in create, delete, mkdirs, rename. + // scenario for this particular test described at MONARCH-HADOOP-764 + @Test + public void outOfBandFolder_rename_rootLevelFiles() throws Exception { + + // NOTE: manual use of CloubBlockBlob targets working directory explicitly. + // WASB driver methods prepend working directory implicitly. + CloudBlockBlob blob = testAccount.getBlobReference("fileX"); + BlobOutputStream s = blob.openOutputStream(); + s.close(); + + Path srcFilePath = new Path("/fileX"); + assertTrue(fs.exists(srcFilePath)); + + Path destFilePath = new Path("/fileXrename"); + fs.rename(srcFilePath, destFilePath); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java new file mode 100644 index 0000000..f2af116 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestReadAndSeekPageBlobAfterWrite.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest; +import org.apache.hadoop.util.Time; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.fs.azure.integration.AzureTestUtils .*; + +/** + * Write data into a page blob and verify you can read back all of it + * or just a part of it. + */ +public class ITestReadAndSeekPageBlobAfterWrite extends AbstractAzureScaleTest { + private static final Logger LOG = + LoggerFactory.getLogger(ITestReadAndSeekPageBlobAfterWrite.class); + + private FileSystem fs; + private byte[] randomData; + + // Page blob physical page size + private static final int PAGE_SIZE = PageBlobFormatHelpers.PAGE_SIZE; + + // Size of data on page (excluding header) + private static final int PAGE_DATA_SIZE = PAGE_SIZE - PageBlobFormatHelpers.PAGE_HEADER_SIZE; + private static final int MAX_BYTES = 33554432; // maximum bytes in a file that we'll test + private static final int MAX_PAGES = MAX_BYTES / PAGE_SIZE; // maximum number of pages we'll test + private Random rand = new Random(); + + // A key with a prefix under /pageBlobs, which for the test file system will + // force use of a page blob. + private static final String KEY = "/pageBlobs/file.dat"; + + // path of page blob file to read and write + private Path blobPath; + + @Override + public void setUp() throws Exception { + super.setUp(); + fs = getTestAccount().getFileSystem(); + // Make sure we are using an integral number of pages. + assertEquals(0, MAX_BYTES % PAGE_SIZE); + + // load an in-memory array of random data + randomData = new byte[PAGE_SIZE * MAX_PAGES]; + rand.nextBytes(randomData); + + blobPath = blobPath("ITestReadAndSeekPageBlobAfterWrite"); + } + + @Override + public void tearDown() throws Exception { + deleteQuietly(fs, blobPath, true); + super.tearDown(); + } + + /** + * Make sure the file name (key) is a page blob file name. If anybody changes that, + * we need to come back and update this test class. + */ + @Test + public void testIsPageBlobFileName() { + AzureNativeFileSystemStore store = ((NativeAzureFileSystem) fs).getStore(); + String[] a = blobPath.toUri().getPath().split("/"); + String key2 = a[1] + "/"; + assertTrue("Not a page blob: " + blobPath, store.isPageBlobKey(key2)); + } + + /** + * For a set of different file sizes, write some random data to a page blob, + * read it back, and compare that what was read is the same as what was written. + */ + @Test + public void testReadAfterWriteRandomData() throws IOException { + + // local shorthand + final int pds = PAGE_DATA_SIZE; + + // Test for sizes at and near page boundaries + int[] dataSizes = { + + // on first page + 0, 1, 2, 3, + + // Near first physical page boundary (because the implementation + // stores PDS + the page header size bytes on each page). + pds - 1, pds, pds + 1, pds + 2, pds + 3, + + // near second physical page boundary + (2 * pds) - 1, (2 * pds), (2 * pds) + 1, (2 * pds) + 2, (2 * pds) + 3, + + // near tenth physical page boundary + (10 * pds) - 1, (10 * pds), (10 * pds) + 1, (10 * pds) + 2, (10 * pds) + 3, + + // test one big size, >> 4MB (an internal buffer size in the code) + MAX_BYTES + }; + + for (int i : dataSizes) { + testReadAfterWriteRandomData(i); + } + } + + private void testReadAfterWriteRandomData(int size) throws IOException { + writeRandomData(size); + readRandomDataAndVerify(size); + } + + /** + * Read "size" bytes of data and verify that what was read and what was written + * are the same. + */ + private void readRandomDataAndVerify(int size) throws AzureException, IOException { + byte[] b = new byte[size]; + FSDataInputStream stream = fs.open(blobPath); + int bytesRead = stream.read(b); + stream.close(); + assertEquals(bytesRead, size); + + // compare the data read to the data written + assertTrue(comparePrefix(randomData, b, size)); + } + + // return true if the beginning "size" values of the arrays are the same + private boolean comparePrefix(byte[] a, byte[] b, int size) { + if (a.length < size || b.length < size) { + return false; + } + for (int i = 0; i < size; i++) { + if (a[i] != b[i]) { + return false; + } + } + return true; + } + + // Write a specified amount of random data to the file path for this test class. + private void writeRandomData(int size) throws IOException { + OutputStream output = fs.create(blobPath); + output.write(randomData, 0, size); + output.close(); + } + + /** + * Write data to a page blob, open it, seek, and then read a range of data. + * Then compare that the data read from that range is the same as the data originally written. + */ + @Test + public void testPageBlobSeekAndReadAfterWrite() throws IOException { + writeRandomData(PAGE_SIZE * MAX_PAGES); + int recordSize = 100; + byte[] b = new byte[recordSize]; + + + try(FSDataInputStream stream = fs.open(blobPath)) { + // Seek to a boundary around the middle of the 6th page + int seekPosition = 5 * PAGE_SIZE + 250; + stream.seek(seekPosition); + + // Read a record's worth of bytes and verify results + int bytesRead = stream.read(b); + verifyReadRandomData(b, bytesRead, seekPosition, recordSize); + + // Seek to another spot and read a record greater than a page + seekPosition = 10 * PAGE_SIZE + 250; + stream.seek(seekPosition); + recordSize = 1000; + b = new byte[recordSize]; + bytesRead = stream.read(b); + verifyReadRandomData(b, bytesRead, seekPosition, recordSize); + + // Read the last 100 bytes of the file + recordSize = 100; + seekPosition = PAGE_SIZE * MAX_PAGES - recordSize; + stream.seek(seekPosition); + b = new byte[recordSize]; + bytesRead = stream.read(b); + verifyReadRandomData(b, bytesRead, seekPosition, recordSize); + + // Read past the end of the file and we should get only partial data. + recordSize = 100; + seekPosition = PAGE_SIZE * MAX_PAGES - recordSize + 50; + stream.seek(seekPosition); + b = new byte[recordSize]; + bytesRead = stream.read(b); + assertEquals(50, bytesRead); + + // compare last 50 bytes written with those read + byte[] tail = Arrays.copyOfRange(randomData, seekPosition, randomData.length); + assertTrue(comparePrefix(tail, b, 50)); + } + } + + // Verify that reading a record of data after seeking gives the expected data. + private void verifyReadRandomData(byte[] b, int bytesRead, int seekPosition, int recordSize) { + byte[] originalRecordData = + Arrays.copyOfRange(randomData, seekPosition, seekPosition + recordSize + 1); + assertEquals(recordSize, bytesRead); + assertTrue(comparePrefix(originalRecordData, b, recordSize)); + } + + // Test many small flushed writes interspersed with periodic hflush calls. + // For manual testing, increase NUM_WRITES to a large number. + // The goal for a long-running manual test is to make sure that it finishes + // and the close() call does not time out. It also facilitates debugging into + // hflush/hsync. + @Test + public void testManySmallWritesWithHFlush() throws IOException { + writeAndReadOneFile(50, 100, 20); + } + + /** + * Write a total of numWrites * recordLength data to a file, read it back, + * and check to make sure what was read is the same as what was written. + * The syncInterval is the number of writes after which to call hflush to + * force the data to storage. + */ + private void writeAndReadOneFile(int numWrites, + int recordLength, int syncInterval) throws IOException { + + // A lower bound on the minimum time we think it will take to do + // a write to Azure storage. + final long MINIMUM_EXPECTED_TIME = 20; + LOG.info("Writing " + numWrites * recordLength + " bytes to " + blobPath.getName()); + FSDataOutputStream output = fs.create(blobPath); + int writesSinceHFlush = 0; + try { + + // Do a flush and hflush to exercise case for empty write queue in PageBlobOutputStream, + // to test concurrent execution gates. + output.flush(); + output.hflush(); + for (int i = 0; i < numWrites; i++) { + output.write(randomData, i * recordLength, recordLength); + writesSinceHFlush++; + output.flush(); + if ((i % syncInterval) == 0) { + output.hflush(); + writesSinceHFlush = 0; + } + } + } finally { + long start = Time.monotonicNow(); + output.close(); + long end = Time.monotonicNow(); + LOG.debug("close duration = " + (end - start) + " msec."); + if (writesSinceHFlush > 0) { + assertTrue(String.format( + "close duration with >= 1 pending write is %d, less than minimum expected of %d", + end - start, MINIMUM_EXPECTED_TIME), + end - start >= MINIMUM_EXPECTED_TIME); + } + } + + // Read the data back and check it. + FSDataInputStream stream = fs.open(blobPath); + int SIZE = numWrites * recordLength; + byte[] b = new byte[SIZE]; + try { + stream.seek(0); + stream.read(b, 0, SIZE); + verifyReadRandomData(b, SIZE, 0, SIZE); + } finally { + stream.close(); + } + + // delete the file + fs.delete(blobPath, false); + } + + // Test writing to a large file repeatedly as a stress test. + // Set the repetitions to a larger number for manual testing + // for a longer stress run. + @Test + public void testLargeFileStress() throws IOException { + int numWrites = 32; + int recordSize = 1024 * 1024; + int syncInterval = 10; + int repetitions = 1; + for (int i = 0; i < repetitions; i++) { + writeAndReadOneFile(numWrites, recordSize, syncInterval); + } + } + + // Write to a file repeatedly to verify that it extends. + // The page blob file should start out at 128MB and finish at 256MB. + public void testFileSizeExtension() throws IOException { + final int writeSize = 1024 * 1024; + final int numWrites = 129; + final byte dataByte = 5; + byte[] data = new byte[writeSize]; + Arrays.fill(data, dataByte); + try (FSDataOutputStream output = fs.create(blobPath)) { + for (int i = 0; i < numWrites; i++) { + output.write(data); + output.hflush(); + LOG.debug("total writes = " + (i + 1)); + } + } + + // Show that we wrote more than the default page blob file size. + assertTrue(numWrites * writeSize > PageBlobOutputStream.PAGE_BLOB_MIN_SIZE); + + // Verify we can list the new size. That will prove we expanded the file. + FileStatus[] status = fs.listStatus(blobPath); + assertEquals("File size hasn't changed " + status, + numWrites * writeSize, status[0].getLen()); + LOG.debug("Total bytes written to " + blobPath + " = " + status[0].getLen()); + fs.delete(blobPath, false); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java new file mode 100644 index 0000000..062bc36 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbRemoteCallHelper.java @@ -0,0 +1,568 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.retry.RetryUtils; +import org.apache.http.Header; +import org.apache.http.HttpResponse; +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.ProtocolVersion; +import org.apache.http.ParseException; +import org.apache.http.HeaderElement; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.mockito.ArgumentMatcher; +import org.mockito.Mockito; + +import java.io.ByteArrayInputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; + +import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; + +/** + * Test class to hold all WasbRemoteCallHelper tests. + */ +public class ITestWasbRemoteCallHelper + extends AbstractWasbTestBase { + public static final String EMPTY_STRING = ""; + private static final int INVALID_HTTP_STATUS_CODE_999 = 999; + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + Configuration conf = new Configuration(); + conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true"); + conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/,http://localhost:8080"); + return AzureBlobStorageTestAccount.create(conf); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, false); + boolean useAuthorization = fs.getConf() + .getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false); + Assume.assumeTrue("Test valid when both SecureMode and Authorization are enabled .. skipping", + useSecureMode && useAuthorization); + } + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + /** + * Test invalid status-code. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testInvalidStatusCode() throws Throwable { + + setupExpectations(); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())) + .thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()) + .thenReturn(newStatusLine(INVALID_HTTP_STATUS_CODE_999)); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test invalid Content-Type. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testInvalidContentType() throws Throwable { + + setupExpectations(); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "text/plain")); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test missing Content-Length. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testMissingContentLength() throws Throwable { + + setupExpectations(); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test Content-Length exceeds max. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testContentLengthExceedsMax() throws Throwable { + + setupExpectations(); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "2048")); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test invalid Content-Length value + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testInvalidContentLengthValue() throws Throwable { + + setupExpectations(); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "20abc48")); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test valid JSON response. + * @throws Throwable + */ + @Test + public void testValidJSONResponse() throws Throwable { + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + Mockito.when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8))); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test malformed JSON response. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testMalFormedJSONResponse() throws Throwable { + + expectedEx.expect(WasbAuthorizationException.class); + expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in FIELD_NAME"); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + Mockito.when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream(malformedJsonResponse().getBytes(StandardCharsets.UTF_8))); + // finished setting up mocks + + performop(mockHttpClient); + } + + /** + * Test valid JSON response failure response code. + * @throws Throwable + */ + @Test // (expected = WasbAuthorizationException.class) + public void testFailureCodeJSONResponse() throws Throwable { + + expectedEx.expect(WasbAuthorizationException.class); + expectedEx.expectMessage("Remote authorization service encountered an error Unauthorized"); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + + HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class); + HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class); + + Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse); + Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponse.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity); + Mockito.when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream(failureCodeJsonResponse().getBytes(StandardCharsets.UTF_8))); + // finished setting up mocks + + performop(mockHttpClient); + } + + @Test + public void testWhenOneInstanceIsDown() throws Throwable { + + boolean isAuthorizationCachingEnabled = fs.getConf().getBoolean(CachingAuthorizer.KEY_AUTH_SERVICE_CACHING_ENABLE, false); + + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class); + + HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseService1.getStatusLine()) + .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseService1.getEntity()) + .thenReturn(mockHttpEntity); + + HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseService2.getStatusLine()) + .thenReturn(newStatusLine(HttpStatus.SC_OK)); + Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseService2.getEntity()) + .thenReturn(mockHttpEntity); + + HttpResponse mockHttpResponseServiceLocal = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseServiceLocal.getStatusLine()) + .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseServiceLocal.getEntity()) + .thenReturn(mockHttpEntity); + + + + class HttpGetForService1 extends ArgumentMatcher<HttpGet>{ + @Override public boolean matches(Object o) { + return checkHttpGetMatchHost((HttpGet) o, "localhost1"); + } + } + class HttpGetForService2 extends ArgumentMatcher<HttpGet>{ + @Override public boolean matches(Object o) { + return checkHttpGetMatchHost((HttpGet) o, "localhost2"); + } + } + class HttpGetForServiceLocal extends ArgumentMatcher<HttpGet>{ + @Override public boolean matches(Object o) { + try { + return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException e) { + return checkHttpGetMatchHost((HttpGet) o, "localhost"); + } + } + } + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1()))) + .thenReturn(mockHttpResponseService1); + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2()))) + .thenReturn(mockHttpResponseService2); + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForServiceLocal()))) + .thenReturn(mockHttpResponseServiceLocal); + + //Need 2 times because performop() does 2 fs operations. + Mockito.when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream(validJsonResponse() + .getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(validJsonResponse() + .getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream(validJsonResponse() + .getBytes(StandardCharsets.UTF_8))); + // finished setting up mocks + + performop(mockHttpClient); + + int expectedNumberOfInvocations = isAuthorizationCachingEnabled ? 1 : 2; + Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForServiceLocal())); + Mockito.verify(mockHttpClient, times(expectedNumberOfInvocations)).execute(Mockito.argThat(new HttpGetForService2())); + } + + @Test + public void testWhenServiceInstancesAreDown() throws Throwable { + //expectedEx.expect(WasbAuthorizationException.class); + // set up mocks + HttpClient mockHttpClient = Mockito.mock(HttpClient.class); + HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class); + + HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseService1.getStatusLine()) + .thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR)); + Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseService1.getEntity()) + .thenReturn(mockHttpEntity); + + HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseService2.getStatusLine()) + .thenReturn(newStatusLine( + HttpStatus.SC_INTERNAL_SERVER_ERROR)); + Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseService2.getEntity()) + .thenReturn(mockHttpEntity); + + HttpResponse mockHttpResponseService3 = Mockito.mock(HttpResponse.class); + Mockito.when(mockHttpResponseService3.getStatusLine()) + .thenReturn(newStatusLine( + HttpStatus.SC_INTERNAL_SERVER_ERROR)); + Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Type")) + .thenReturn(newHeader("Content-Type", "application/json")); + Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Length")) + .thenReturn(newHeader("Content-Length", "1024")); + Mockito.when(mockHttpResponseService3.getEntity()) + .thenReturn(mockHttpEntity); + + class HttpGetForService1 extends ArgumentMatcher<HttpGet>{ + @Override public boolean matches(Object o) { + return checkHttpGetMatchHost((HttpGet) o, "localhost1"); + } + } + class HttpGetForService2 extends ArgumentMatcher<HttpGet>{ + @Override public boolean matches(Object o) { + return checkHttpGetMatchHost((HttpGet) o, "localhost2"); + } + } + class HttpGetForService3 extends ArgumentMatcher<HttpGet> { + @Override public boolean matches(Object o){ + try { + return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName()); + } catch (UnknownHostException e) { + return checkHttpGetMatchHost((HttpGet) o, "localhost"); + } + } + } + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1()))) + .thenReturn(mockHttpResponseService1); + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2()))) + .thenReturn(mockHttpResponseService2); + Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService3()))) + .thenReturn(mockHttpResponseService3); + + //Need 3 times because performop() does 3 fs operations. + Mockito.when(mockHttpEntity.getContent()) + .thenReturn(new ByteArrayInputStream( + validJsonResponse().getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream( + validJsonResponse().getBytes(StandardCharsets.UTF_8))) + .thenReturn(new ByteArrayInputStream( + validJsonResponse().getBytes(StandardCharsets.UTF_8))); + // finished setting up mocks + try { + performop(mockHttpClient); + }catch (WasbAuthorizationException e){ + e.printStackTrace(); + Mockito.verify(mockHttpClient, atLeast(2)) + .execute(argThat(new HttpGetForService1())); + Mockito.verify(mockHttpClient, atLeast(2)) + .execute(argThat(new HttpGetForService2())); + Mockito.verify(mockHttpClient, atLeast(3)) + .execute(argThat(new HttpGetForService3())); + Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any()); + } + } + + private void setupExpectations() { + expectedEx.expect(WasbAuthorizationException.class); + + class MatchesPattern extends TypeSafeMatcher<String> { + private String pattern; + + MatchesPattern(String pattern) { + this.pattern = pattern; + } + + @Override protected boolean matchesSafely(String item) { + return item.matches(pattern); + } + + @Override public void describeTo(Description description) { + description.appendText("matches pattern ").appendValue(pattern); + } + + @Override protected void describeMismatchSafely(String item, + Description mismatchDescription) { + mismatchDescription.appendText("does not match"); + } + } + + expectedEx.expectMessage(new MatchesPattern( + "org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: " + + "Encountered error while making remote call to " + + "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/,http:\\/\\/localhost:8080 retried 6 time\\(s\\)\\.")); + } + + private void performop(HttpClient mockHttpClient) throws Throwable { + + Path testPath = new Path("/", "test.dat"); + + RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl(); + authorizer.init(fs.getConf()); + WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper( + RetryUtils.getMultipleLinearRandomRetry(new Configuration(), + EMPTY_STRING, true, + EMPTY_STRING, "1000,3,10000,2")); + mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient); + authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper); + fs.updateWasbAuthorizer(authorizer); + + fs.create(testPath); + ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath); + fs.delete(testPath, false); + } + + private String validJsonResponse() { + return "{" + + "\"responseCode\": 0," + + "\"authorizationResult\": true," + + "\"responseMessage\": \"Authorized\"" + + "}"; + } + + private String malformedJsonResponse() { + return "{" + + "\"responseCode\": 0," + + "\"authorizationResult\": true," + + "\"responseMessage\":"; + } + + private String failureCodeJsonResponse() { + return "{" + + "\"responseCode\": 1," + + "\"authorizationResult\": false," + + "\"responseMessage\": \"Unauthorized\"" + + "}"; + } + + private StatusLine newStatusLine(int statusCode) { + return new StatusLine() { + @Override + public ProtocolVersion getProtocolVersion() { + return new ProtocolVersion("HTTP", 1, 1); + } + + @Override + public int getStatusCode() { + return statusCode; + } + + @Override + public String getReasonPhrase() { + return "Reason Phrase"; + } + }; + } + + private Header newHeader(String name, String value) { + return new Header() { + @Override + public String getName() { + return name; + } + + @Override + public String getValue() { + return value; + } + + @Override + public HeaderElement[] getElements() throws ParseException { + return new HeaderElement[0]; + } + }; + } + + /** Check that a HttpGet request is with given remote host. */ + private static boolean checkHttpGetMatchHost(HttpGet g, String h) { + return g != null && g.getURI().getHost().equals(h); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java new file mode 100644 index 0000000..bee0220 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestWasbUriAndConfiguration.java @@ -0,0 +1,610 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.junit.Assume.assumeNotNull; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Date; +import java.util.EnumSet; +import java.io.File; + +import org.apache.hadoop.fs.azure.integration.AzureTestUtils; +import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.AbstractFileSystem; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.CreateOptions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; + +public class ITestWasbUriAndConfiguration extends AbstractWasbTestWithTimeout { + + private static final int FILE_SIZE = 4096; + private static final String PATH_DELIMITER = "/"; + + protected String accountName; + protected String accountKey; + protected static Configuration conf = null; + private boolean runningInSASMode = false; + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + + private AzureBlobStorageTestAccount testAccount; + + @After + public void tearDown() throws Exception { + testAccount = AzureTestUtils.cleanupTestAccount(testAccount); + } + + @Before + public void setMode() { + runningInSASMode = AzureBlobStorageTestAccount.createTestConfiguration(). + getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE, false); + } + + private boolean validateIOStreams(Path filePath) throws IOException { + // Capture the file system from the test account. + FileSystem fs = testAccount.getFileSystem(); + return validateIOStreams(fs, filePath); + } + + private boolean validateIOStreams(FileSystem fs, Path filePath) + throws IOException { + + // Create and write a file + OutputStream outputStream = fs.create(filePath); + outputStream.write(new byte[FILE_SIZE]); + outputStream.close(); + + // Return true if the the count is equivalent to the file size. + return (FILE_SIZE == readInputStream(fs, filePath)); + } + + private int readInputStream(Path filePath) throws IOException { + // Capture the file system from the test account. + FileSystem fs = testAccount.getFileSystem(); + return readInputStream(fs, filePath); + } + + private int readInputStream(FileSystem fs, Path filePath) throws IOException { + // Read the file + InputStream inputStream = fs.open(filePath); + int count = 0; + while (inputStream.read() >= 0) { + count++; + } + inputStream.close(); + + // Return true if the the count is equivalent to the file size. + return count; + } + + // Positive tests to exercise making a connection with to Azure account using + // account key. + @Test + public void testConnectUsingKey() throws Exception { + + testAccount = AzureBlobStorageTestAccount.create(); + assumeNotNull(testAccount); + + // Validate input and output on the connection. + assertTrue(validateIOStreams(new Path("/wasb_scheme"))); + } + + @Test + public void testConnectUsingSAS() throws Exception { + + Assume.assumeFalse(runningInSASMode); + // Create the test account with SAS credentials. + testAccount = AzureBlobStorageTestAccount.create("", + EnumSet.of(CreateOptions.UseSas, CreateOptions.CreateContainer)); + assumeNotNull(testAccount); + // Validate input and output on the connection. + // NOTE: As of 4/15/2013, Azure Storage has a deficiency that prevents the + // full scenario from working (CopyFromBlob doesn't work with SAS), so + // just do a minor check until that is corrected. + assertFalse(testAccount.getFileSystem().exists(new Path("/IDontExist"))); + //assertTrue(validateIOStreams(new Path("/sastest.txt"))); + } + + @Test + public void testConnectUsingSASReadonly() throws Exception { + + Assume.assumeFalse(runningInSASMode); + // Create the test account with SAS credentials. + testAccount = AzureBlobStorageTestAccount.create("", EnumSet.of( + CreateOptions.UseSas, CreateOptions.CreateContainer, + CreateOptions.Readonly)); + assumeNotNull(testAccount); + + // Create a blob in there + final String blobKey = "blobForReadonly"; + CloudBlobContainer container = testAccount.getRealContainer(); + CloudBlockBlob blob = container.getBlockBlobReference(blobKey); + ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[] { 1, + 2, 3 }); + blob.upload(inputStream, 3); + inputStream.close(); + + // Make sure we can read it from the file system + Path filePath = new Path("/" + blobKey); + FileSystem fs = testAccount.getFileSystem(); + assertTrue(fs.exists(filePath)); + byte[] obtained = new byte[3]; + DataInputStream obtainedInputStream = fs.open(filePath); + obtainedInputStream.readFully(obtained); + obtainedInputStream.close(); + assertEquals(3, obtained[2]); + } + + @Test + public void testConnectUsingAnonymous() throws Exception { + + // Create test account with anonymous credentials + testAccount = AzureBlobStorageTestAccount.createAnonymous("testWasb.txt", + FILE_SIZE); + assumeNotNull(testAccount); + + // Read the file from the public folder using anonymous credentials. + assertEquals(FILE_SIZE, readInputStream(new Path("/testWasb.txt"))); + } + + @Test + public void testConnectToEmulator() throws Exception { + testAccount = AzureBlobStorageTestAccount.createForEmulator(); + assumeNotNull(testAccount); + assertTrue(validateIOStreams(new Path("/testFile"))); + } + + /** + * Tests that we can connect to fully qualified accounts outside of + * blob.core.windows.net + */ + @Test + public void testConnectToFullyQualifiedAccountMock() throws Exception { + Configuration conf = new Configuration(); + AzureBlobStorageTestAccount.setMockAccountKey(conf, + "mockAccount.mock.authority.net"); + AzureNativeFileSystemStore store = new AzureNativeFileSystemStore(); + MockStorageInterface mockStorage = new MockStorageInterface(); + store.setAzureStorageInteractionLayer(mockStorage); + NativeAzureFileSystem fs = new NativeAzureFileSystem(store); + fs.initialize( + new URI("wasb://mockcontai...@mockaccount.mock.authority.net"), conf); + fs.createNewFile(new Path("/x")); + assertTrue(mockStorage.getBackingStore().exists( + "http://mockAccount.mock.authority.net/mockContainer/x")); + fs.close(); + } + + public void testConnectToRoot() throws Exception { + + // Set up blob names. + final String blobPrefix = String.format("wasbtests-%s-%tQ-blob", + System.getProperty("user.name"), new Date()); + final String inblobName = blobPrefix + "_In" + ".txt"; + final String outblobName = blobPrefix + "_Out" + ".txt"; + + // Create test account with default root access. + testAccount = AzureBlobStorageTestAccount.createRoot(inblobName, FILE_SIZE); + assumeNotNull(testAccount); + + // Read the file from the default container. + assertEquals(FILE_SIZE, readInputStream(new Path(PATH_DELIMITER + + inblobName))); + + try { + // Capture file system. + FileSystem fs = testAccount.getFileSystem(); + + // Create output path and open an output stream to the root folder. + Path outputPath = new Path(PATH_DELIMITER + outblobName); + OutputStream outputStream = fs.create(outputPath); + fail("Expected an AzureException when writing to root folder."); + outputStream.write(new byte[FILE_SIZE]); + outputStream.close(); + } catch (AzureException e) { + assertTrue(true); + } catch (Exception e) { + String errMsg = String.format( + "Expected AzureException but got %s instead.", e); + assertTrue(errMsg, false); + } + } + + // Positive tests to exercise throttling I/O path. Connections are made to an + // Azure account using account key. + // + public void testConnectWithThrottling() throws Exception { + + testAccount = AzureBlobStorageTestAccount.createThrottled(); + + // Validate input and output on the connection. + assertTrue(validateIOStreams(new Path("/wasb_scheme"))); + } + + /** + * Creates a file and writes a single byte with the given value in it. + */ + private static void writeSingleByte(FileSystem fs, Path testFile, int toWrite) + throws Exception { + OutputStream outputStream = fs.create(testFile); + outputStream.write(toWrite); + outputStream.close(); + } + + /** + * Reads the file given and makes sure that it's a single-byte file with the + * given value in it. + */ + private static void assertSingleByteValue(FileSystem fs, Path testFile, + int expectedValue) throws Exception { + InputStream inputStream = fs.open(testFile); + int byteRead = inputStream.read(); + assertTrue("File unexpectedly empty: " + testFile, byteRead >= 0); + assertTrue("File has more than a single byte: " + testFile, + inputStream.read() < 0); + inputStream.close(); + assertEquals("Unxpected content in: " + testFile, expectedValue, byteRead); + } + + @Test + public void testMultipleContainers() throws Exception { + AzureBlobStorageTestAccount firstAccount = AzureBlobStorageTestAccount + .create("first"), secondAccount = AzureBlobStorageTestAccount + .create("second"); + assumeNotNull(firstAccount); + assumeNotNull(secondAccount); + try { + FileSystem firstFs = firstAccount.getFileSystem(), + secondFs = secondAccount.getFileSystem(); + Path testFile = new Path("/testWasb"); + assertTrue(validateIOStreams(firstFs, testFile)); + assertTrue(validateIOStreams(secondFs, testFile)); + // Make sure that we're really dealing with two file systems here. + writeSingleByte(firstFs, testFile, 5); + writeSingleByte(secondFs, testFile, 7); + assertSingleByteValue(firstFs, testFile, 5); + assertSingleByteValue(secondFs, testFile, 7); + } finally { + firstAccount.cleanup(); + secondAccount.cleanup(); + } + } + + @Test + public void testDefaultKeyProvider() throws Exception { + Configuration conf = new Configuration(); + String account = "testacct"; + String key = "testkey"; + + conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key); + + String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration( + account, conf); + assertEquals(key, result); + } + + @Test + public void testCredsFromCredentialProvider() throws Exception { + + Assume.assumeFalse(runningInSASMode); + String account = "testacct"; + String key = "testkey"; + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccountKey(conf, account, key); + + // also add to configuration as clear text that should be overridden + conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, + key + "cleartext"); + + String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration( + account, conf); + // result should contain the credential provider key not the config key + assertEquals("AccountKey incorrect.", key, result); + } + + void provisionAccountKey( + final Configuration conf, String account, String key) throws Exception { + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry( + SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key.toCharArray()); + provider.flush(); + } + + @Test + public void testValidKeyProvider() throws Exception { + Configuration conf = new Configuration(); + String account = "testacct"; + String key = "testkey"; + + conf.set(SimpleKeyProvider.KEY_ACCOUNT_KEY_PREFIX + account, key); + conf.setClass("fs.azure.account.keyprovider." + account, + SimpleKeyProvider.class, KeyProvider.class); + String result = AzureNativeFileSystemStore.getAccountKeyFromConfiguration( + account, conf); + assertEquals(key, result); + } + + @Test + public void testInvalidKeyProviderNonexistantClass() throws Exception { + Configuration conf = new Configuration(); + String account = "testacct"; + + conf.set("fs.azure.account.keyprovider." + account, + "org.apache.Nonexistant.Class"); + try { + AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf); + Assert.fail("Nonexistant key provider class should have thrown a " + + "KeyProviderException"); + } catch (KeyProviderException e) { + } + } + + @Test + public void testInvalidKeyProviderWrongClass() throws Exception { + Configuration conf = new Configuration(); + String account = "testacct"; + + conf.set("fs.azure.account.keyprovider." + account, "java.lang.String"); + try { + AzureNativeFileSystemStore.getAccountKeyFromConfiguration(account, conf); + Assert.fail("Key provider class that doesn't implement KeyProvider " + + "should have thrown a KeyProviderException"); + } catch (KeyProviderException e) { + } + } + + /** + * Tests the cases when the URI is specified with no authority, i.e. + * wasb:///path/to/file. + */ + @Test + public void testNoUriAuthority() throws Exception { + // For any combination of default FS being asv(s)/wasb(s)://c@a/ and + // the actual URI being asv(s)/wasb(s):///, it should work. + + String[] wasbAliases = new String[] { "wasb", "wasbs" }; + for (String defaultScheme : wasbAliases) { + for (String wantedScheme : wasbAliases) { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI(defaultScheme, authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + // Add references to file system implementations for wasb and wasbs. + conf.addResource("azure-test.xml"); + URI wantedUri = new URI(wantedScheme + ":///random/path"); + NativeAzureFileSystem obtained = (NativeAzureFileSystem) FileSystem + .get(wantedUri, conf); + assertNotNull(obtained); + assertEquals(new URI(wantedScheme, authority, null, null, null), + obtained.getUri()); + // Make sure makeQualified works as expected + Path qualified = obtained.makeQualified(new Path(wantedUri)); + assertEquals(new URI(wantedScheme, authority, wantedUri.getPath(), + null, null), qualified.toUri()); + // Cleanup for the next iteration to not cache anything in FS + testAccount.cleanup(); + FileSystem.closeAll(); + } + } + // If the default FS is not a WASB FS, then specifying a URI without + // authority for the Azure file system should throw. + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + conf.set(FS_DEFAULT_NAME_KEY, "file:///"); + try { + FileSystem.get(new URI("wasb:///random/path"), conf); + fail("Should've thrown."); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testWasbAsDefaultFileSystemHasNoPort() throws Exception { + try { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI("wasb", authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + conf.addResource("azure-test.xml"); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs instanceof NativeAzureFileSystem); + assertEquals(-1, fs.getUri().getPort()); + + AbstractFileSystem afs = FileContext.getFileContext(conf) + .getDefaultFileSystem(); + assertTrue(afs instanceof Wasb); + assertEquals(-1, afs.getUri().getPort()); + } finally { + testAccount.cleanup(); + FileSystem.closeAll(); + } + } + + /** + * Tests the cases when the scheme specified is 'wasbs'. + */ + @Test + public void testAbstractFileSystemImplementationForWasbsScheme() throws Exception { + try { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI("wasbs", authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs"); + conf.addResource("azure-test.xml"); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs instanceof NativeAzureFileSystem); + assertEquals("wasbs", fs.getScheme()); + + AbstractFileSystem afs = FileContext.getFileContext(conf) + .getDefaultFileSystem(); + assertTrue(afs instanceof Wasbs); + assertEquals(-1, afs.getUri().getPort()); + assertEquals("wasbs", afs.getUri().getScheme()); + } finally { + testAccount.cleanup(); + FileSystem.closeAll(); + } + } + + @Test + public void testNoAbstractFileSystemImplementationSpecifiedForWasbsScheme() throws Exception { + try { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI("wasbs", authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + + FileSystem fs = FileSystem.get(conf); + assertTrue(fs instanceof NativeAzureFileSystem); + assertEquals("wasbs", fs.getScheme()); + + // should throw if 'fs.AbstractFileSystem.wasbs.impl'' is not specified + try{ + FileContext.getFileContext(conf).getDefaultFileSystem(); + fail("Should've thrown."); + }catch(UnsupportedFileSystemException e){ + } + + } finally { + testAccount.cleanup(); + FileSystem.closeAll(); + } + } + + @Test + public void testCredentialProviderPathExclusions() throws Exception { + String providerPath = + "user:///,jceks://wasb/user/hrt_qa/sqoopdbpasswd.jceks," + + "jceks://h...@nn1.example.com/my/path/test.jceks"; + Configuration config = new Configuration(); + config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + providerPath); + String newPath = "user:///,jceks://h...@nn1.example.com/my/path/test.jceks"; + + excludeAndTestExpectations(config, newPath); + } + + @Test + public void testExcludeAllProviderTypesFromConfig() throws Exception { + String providerPath = + "jceks://wasb/tmp/test.jceks," + + "jceks://wasb@/my/path/test.jceks"; + Configuration config = new Configuration(); + config.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + providerPath); + String newPath = null; + + excludeAndTestExpectations(config, newPath); + } + + void excludeAndTestExpectations(Configuration config, String newPath) + throws Exception { + Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders( + config, NativeAzureFileSystem.class); + String effectivePath = conf.get( + CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, null); + assertEquals(newPath, effectivePath); + } + + @Test + public void testUserAgentConfig() throws Exception { + // Set the user agent + try { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI("wasbs", authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs"); + + conf.set(AzureNativeFileSystemStore.USER_AGENT_ID_KEY, "TestClient"); + + FileSystem fs = FileSystem.get(conf); + AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem(); + + assertTrue(afs instanceof Wasbs); + assertEquals(-1, afs.getUri().getPort()); + assertEquals("wasbs", afs.getUri().getScheme()); + + } finally { + testAccount.cleanup(); + FileSystem.closeAll(); + } + + // Unset the user agent + try { + testAccount = AzureBlobStorageTestAccount.createMock(); + Configuration conf = testAccount.getFileSystem().getConf(); + String authority = testAccount.getFileSystem().getUri().getAuthority(); + URI defaultUri = new URI("wasbs", authority, null, null, null); + conf.set(FS_DEFAULT_NAME_KEY, defaultUri.toString()); + conf.set("fs.AbstractFileSystem.wasbs.impl", "org.apache.hadoop.fs.azure.Wasbs"); + + conf.unset(AzureNativeFileSystemStore.USER_AGENT_ID_KEY); + + FileSystem fs = FileSystem.get(conf); + AbstractFileSystem afs = FileContext.getFileContext(conf).getDefaultFileSystem(); + assertTrue(afs instanceof Wasbs); + assertEquals(-1, afs.getUri().getPort()); + assertEquals("wasbs", afs.getUri().getScheme()); + + } finally { + testAccount.cleanup(); + FileSystem.closeAll(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9f6b08f8/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java index 9fbab49..7354499 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockWasbAuthorizerImpl.java @@ -38,11 +38,12 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface { private boolean performOwnerMatch; private CachingAuthorizer<CachedAuthorizerEntry, Boolean> cache; - // The full qualified URL to the root directory + // The full qualified URL to the root directory private String qualifiedPrefixUrl; public MockWasbAuthorizerImpl(NativeAzureFileSystem fs) { - qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(), fs.getWorkingDirectory()) + qualifiedPrefixUrl = new Path("/").makeQualified(fs.getUri(), + fs.getWorkingDirectory()) .toString().replaceAll("/$", ""); cache = new CachingAuthorizer<>(TimeUnit.MINUTES.convert(5L, TimeUnit.MINUTES), "AUTHORIZATION"); } @@ -64,19 +65,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface { public void addAuthRule(String wasbAbsolutePath, String accessType, boolean access) { - wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath; - AuthorizationComponent component = wasbAbsolutePath.endsWith("*") - ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"), accessType) + wasbAbsolutePath = qualifiedPrefixUrl + wasbAbsolutePath; + AuthorizationComponent component = wasbAbsolutePath.endsWith("*") + ? new AuthorizationComponent("^" + wasbAbsolutePath.replace("*", ".*"), + accessType) : new AuthorizationComponent(wasbAbsolutePath, accessType); this.authRules.put(component, access); } @Override - public boolean authorize(String wasbAbsolutePath, String accessType, String owner) + public boolean authorize(String wasbAbsolutePath, + String accessType, + String owner) throws WasbAuthorizationException { - if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) { + if (wasbAbsolutePath.endsWith( + NativeAzureFileSystem.FolderRenamePending.SUFFIX)) { return true; } @@ -108,20 +113,23 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface { // In case of root("/"), owner match does not happen because owner is returned as empty string. // we try to force owner match just for purpose of tests to make sure all operations work seemlessly with owner. if (this.performOwnerMatch - && StringUtils.equalsIgnoreCase(wasbAbsolutePath, qualifiedPrefixUrl + "/")) { + && StringUtils.equalsIgnoreCase(wasbAbsolutePath, + qualifiedPrefixUrl + "/")) { owner = currentUserShortName; } boolean shouldEvaluateOwnerAccess = owner != null && !owner.isEmpty() - && this.performOwnerMatch; + && this.performOwnerMatch; - boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName, owner); + boolean isOwnerMatch = StringUtils.equalsIgnoreCase(currentUserShortName, + owner); AuthorizationComponent component = new AuthorizationComponent(wasbAbsolutePath, accessType); if (authRules.containsKey(component)) { - return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get(component) : authRules.get(component); + return shouldEvaluateOwnerAccess ? isOwnerMatch && authRules.get( + component) : authRules.get(component); } else { // Regex-pattern match if we don't have a straight match for (Map.Entry<AuthorizationComponent, Boolean> entry : authRules.entrySet()) { @@ -129,8 +137,11 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface { String keyPath = key.getWasbAbsolutePath(); String keyAccess = key.getAccessType(); - if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath) && keyAccess.equals(accessType)) { - return shouldEvaluateOwnerAccess ? isOwnerMatch && entry.getValue() : entry.getValue(); + if (keyPath.endsWith("*") && Pattern.matches(keyPath, wasbAbsolutePath) + && keyAccess.equals(accessType)) { + return shouldEvaluateOwnerAccess + ? isOwnerMatch && entry.getValue() + : entry.getValue(); } } return false; @@ -141,47 +152,47 @@ public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface { authRules.clear(); cache.clear(); } -} -class AuthorizationComponent { + private static class AuthorizationComponent { - private String wasbAbsolutePath; - private String accessType; + private final String wasbAbsolutePath; + private final String accessType; - public AuthorizationComponent(String wasbAbsolutePath, - String accessType) { - this.wasbAbsolutePath = wasbAbsolutePath; - this.accessType = accessType; - } + AuthorizationComponent(String wasbAbsolutePath, + String accessType) { + this.wasbAbsolutePath = wasbAbsolutePath; + this.accessType = accessType; + } - @Override - public int hashCode() { - return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode(); - } + @Override + public int hashCode() { + return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode(); + } - @Override - public boolean equals(Object obj) { + @Override + public boolean equals(Object obj) { - if (obj == this) { - return true; - } + if (obj == this) { + return true; + } - if (obj == null - || !(obj instanceof AuthorizationComponent)) { - return false; - } + if (obj == null + || !(obj instanceof AuthorizationComponent)) { + return false; + } - return ((AuthorizationComponent)obj). - getWasbAbsolutePath().equals(this.wasbAbsolutePath) - && ((AuthorizationComponent)obj). - getAccessType().equals(this.accessType); - } + return ((AuthorizationComponent) obj). + getWasbAbsolutePath().equals(this.wasbAbsolutePath) + && ((AuthorizationComponent) obj). + getAccessType().equals(this.accessType); + } - public String getWasbAbsolutePath() { - return this.wasbAbsolutePath; - } + public String getWasbAbsolutePath() { + return this.wasbAbsolutePath; + } - public String getAccessType() { - return accessType; + public String getAccessType() { + return accessType; + } } -} \ No newline at end of file +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org