http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java deleted file mode 100644 index 5e07dcb..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * Test some scalable operations related to file renaming and deletion. - */ -public class TestS3ADeleteManyFiles extends S3AScaleTestBase { - private static final Logger LOG = - LoggerFactory.getLogger(TestS3ADeleteManyFiles.class); - - /** - * CAUTION: If this test starts failing, please make sure that the - * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not - * set too low. Alternatively, consider reducing the - * <code>scale.test.operation.count</code> parameter in - * <code>getOperationCount()</code>. - * - * @see #getOperationCount() - */ - @Test - public void testBulkRenameAndDelete() throws Throwable { - final Path scaleTestDir = getTestPath(); - final Path srcDir = new Path(scaleTestDir, "src"); - final Path finalDir = new Path(scaleTestDir, "final"); - final long count = getOperationCount(); - ContractTestUtils.rm(fs, scaleTestDir, true, false); - - fs.mkdirs(srcDir); - fs.mkdirs(finalDir); - - int testBufferSize = fs.getConf() - .getInt(ContractTestUtils.IO_CHUNK_BUFFER_SIZE, - ContractTestUtils.DEFAULT_IO_CHUNK_BUFFER_SIZE); - // use Executor to speed up file creation - ExecutorService exec = Executors.newFixedThreadPool(16); - final ExecutorCompletionService<Boolean> completionService = - new ExecutorCompletionService<>(exec); - try { - final byte[] data = ContractTestUtils.dataset(testBufferSize, 'a', 'z'); - - for (int i = 0; i < count; ++i) { - final String fileName = "foo-" + i; - completionService.submit(new Callable<Boolean>() { - @Override - public Boolean call() throws IOException { - ContractTestUtils.createFile(fs, new Path(srcDir, fileName), - false, data); - return fs.exists(new Path(srcDir, fileName)); - } - }); - } - for (int i = 0; i < count; ++i) { - final Future<Boolean> future = completionService.take(); - try { - if (!future.get()) { - LOG.warn("cannot create file"); - } - } catch (ExecutionException e) { - LOG.warn("Error while uploading file", e.getCause()); - throw e; - } - } - } finally { - exec.shutdown(); - } - - int nSrcFiles = fs.listStatus(srcDir).length; - fs.rename(srcDir, finalDir); - assertEquals(nSrcFiles, fs.listStatus(finalDir).length); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + 0)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + count / 2)); - ContractTestUtils.assertPathDoesNotExist(fs, "not deleted after rename", - new Path(srcDir, "foo-" + (count - 1))); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + 0)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + count/2)); - ContractTestUtils.assertPathExists(fs, "not renamed to dest dir", - new Path(finalDir, "foo-" + (count-1))); - - ContractTestUtils.assertDeleted(fs, finalDir, true, false); - } - - @Test - public void testOpenCreate() throws IOException { - Path dir = new Path("/tests3a"); - ContractTestUtils.createAndVerifyFile(fs, dir, 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024); - - - /* - Enable to test the multipart upload - try { - ContractTestUtils.createAndVerifyFile(fs, dir, - (long)6 * 1024 * 1024 * 1024); - } catch (IOException e) { - fail(e.getMessage()); - } - */ - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java deleted file mode 100644 index 35ea3ad..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.Statistic; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -import static org.apache.hadoop.fs.s3a.Statistic.*; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; -import static org.apache.hadoop.fs.contract.ContractTestUtils.*; - -/** - * Test the performance of listing files/directories. - */ -public class TestS3ADirectoryPerformance extends S3AScaleTestBase { - private static final Logger LOG = LoggerFactory.getLogger( - TestS3ADirectoryPerformance.class); - - @Test - public void testListOperations() throws Throwable { - describe("Test recursive list operations"); - final Path scaleTestDir = getTestPath(); - final Path listDir = new Path(scaleTestDir, "lists"); - - // scale factor. - int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT); - int width = scale; - int depth = scale; - int files = scale; - MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); - MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); - MetricDiff listContinueRequests = - new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS); - MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES); - MetricDiff getFileStatusCalls = - new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); - NanoTimer createTimer = new NanoTimer(); - TreeScanResults created = - createSubdirs(fs, listDir, depth, width, files, 0); - // add some empty directories - int emptyDepth = 1 * scale; - int emptyWidth = 3 * scale; - - created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0, - 0, "empty", "f-", "")); - createTimer.end("Time to create %s", created); - LOG.info("Time per operation: {}", - toHuman(createTimer.nanosPerOperation(created.totalCount()))); - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - - describe("Listing files via treewalk"); - try { - // Scan the directory via an explicit tree walk. - // This is the baseline for any listing speedups. - NanoTimer treeWalkTimer = new NanoTimer(); - TreeScanResults treewalkResults = treeWalk(fs, listDir); - treeWalkTimer.end("List status via treewalk of %s", created); - - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - assertEquals("Files found in listFiles(recursive=true) " + - " created=" + created + " listed=" + treewalkResults, - created.getFileCount(), treewalkResults.getFileCount()); - - describe("Listing files via listFiles(recursive=true)"); - // listFiles() does the recursion internally - NanoTimer listFilesRecursiveTimer = new NanoTimer(); - - TreeScanResults listFilesResults = new TreeScanResults( - fs.listFiles(listDir, true)); - - listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created); - assertEquals("Files found in listFiles(recursive=true) " + - " created=" + created + " listed=" + listFilesResults, - created.getFileCount(), listFilesResults.getFileCount()); - - // only two list operations should have taken place - print(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - assertEquals(listRequests.toString(), 2, listRequests.diff()); - reset(metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - - - } finally { - describe("deletion"); - // deletion at the end of the run - NanoTimer deleteTimer = new NanoTimer(); - fs.delete(listDir, true); - deleteTimer.end("Deleting directory tree"); - printThenReset(LOG, - metadataRequests, - listRequests, - listContinueRequests, - listStatusCalls, - getFileStatusCalls); - } - } - - @Test - public void testTimeToStatEmptyDirectory() throws Throwable { - describe("Time to stat an empty directory"); - Path path = new Path(getTestPath(), "empty"); - fs.mkdirs(path); - timeToStatPath(path); - } - - @Test - public void testTimeToStatNonEmptyDirectory() throws Throwable { - describe("Time to stat a non-empty directory"); - Path path = new Path(getTestPath(), "dir"); - fs.mkdirs(path); - touch(fs, new Path(path, "file")); - timeToStatPath(path); - } - - @Test - public void testTimeToStatFile() throws Throwable { - describe("Time to stat a simple file"); - Path path = new Path(getTestPath(), "file"); - touch(fs, path); - timeToStatPath(path); - } - - @Test - public void testTimeToStatRoot() throws Throwable { - describe("Time to stat the root path"); - timeToStatPath(new Path("/")); - } - - private void timeToStatPath(Path path) throws IOException { - describe("Timing getFileStatus(\"%s\")", path); - MetricDiff metadataRequests = - new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS); - MetricDiff listRequests = - new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS); - long attempts = getOperationCount(); - NanoTimer timer = new NanoTimer(); - for (long l = 0; l < attempts; l++) { - fs.getFileStatus(path); - } - timer.end("Time to execute %d getFileStatusCalls", attempts); - LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts))); - LOG.info("metadata: {}", metadataRequests); - LOG.info("metadata per operation {}", metadataRequests.diff() / attempts); - LOG.info("listObjects: {}", listRequests); - LOG.info("listObjects: per operation {}", listRequests.diff() / attempts); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java deleted file mode 100644 index d6d9d66..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java +++ /dev/null @@ -1,534 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a.scale; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.contract.ContractTestUtils; -import org.apache.hadoop.fs.s3a.S3AFileStatus; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3AInputPolicy; -import org.apache.hadoop.fs.s3a.S3AInputStream; -import org.apache.hadoop.fs.s3a.S3AInstrumentation; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.util.LineReader; -import org.junit.After; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.EOFException; -import java.io.IOException; - -import static org.apache.hadoop.fs.contract.ContractTestUtils.*; -import static org.apache.hadoop.fs.s3a.Constants.*; - -/** - * Look at the performance of S3a operations. - */ -public class TestS3AInputStreamPerformance extends S3AScaleTestBase { - private static final Logger LOG = LoggerFactory.getLogger( - TestS3AInputStreamPerformance.class); - - private S3AFileSystem s3aFS; - private Path testData; - private S3AFileStatus testDataStatus; - private FSDataInputStream in; - private S3AInstrumentation.InputStreamStatistics streamStatistics; - public static final int BLOCK_SIZE = 32 * 1024; - public static final int BIG_BLOCK_SIZE = 256 * 1024; - - /** Tests only run if the there is a named test file that can be read. */ - private boolean testDataAvailable = true; - private String assumptionMessage = "test file"; - - /** - * Open the FS and the test data. The input stream is always set up here. - * @throws IOException IO Problems. - */ - @Before - public void openFS() throws IOException { - Configuration conf = getConf(); - conf.setInt(SOCKET_SEND_BUFFER, 16 * 1024); - conf.setInt(SOCKET_RECV_BUFFER, 16 * 1024); - String testFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE); - if (testFile.isEmpty()) { - assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE; - testDataAvailable = false; - } else { - S3ATestUtils.useCSVDataEndpoint(conf); - testData = new Path(testFile); - Path path = this.testData; - bindS3aFS(path); - try { - testDataStatus = s3aFS.getFileStatus(this.testData); - } catch (IOException e) { - LOG.warn("Failed to read file {} specified in {}", - testFile, KEY_CSVTEST_FILE, e); - throw e; - } - } - } - - private void bindS3aFS(Path path) throws IOException { - s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf()); - } - - /** - * Cleanup: close the stream, close the FS. - */ - @After - public void cleanup() { - describe("cleanup"); - IOUtils.closeStream(in); - IOUtils.closeStream(s3aFS); - } - - /** - * Declare that the test requires the CSV test dataset. - */ - private void requireCSVTestData() { - Assume.assumeTrue(assumptionMessage, testDataAvailable); - } - - /** - * Open the test file with the read buffer specified in the setting. - * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - FSDataInputStream openTestFile() throws IOException { - return openTestFile(S3AInputPolicy.Normal, 0); - } - - /** - * Open the test file with the read buffer specified in the setting - * {@link #KEY_READ_BUFFER_SIZE}. - * This includes the {@link #requireCSVTestData()} assumption; so - * if called before any FS op, will automatically skip the test - * if the CSV file is absent. - * - * @param inputPolicy input policy to use - * @param readahead readahead/buffer size - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead) - throws IOException { - requireCSVTestData(); - return openDataFile(s3aFS, this.testData, inputPolicy, readahead); - } - - /** - * Open a test file with the read buffer specified in the setting - * {@link #KEY_READ_BUFFER_SIZE}. - * - * @param path path to open - * @param inputPolicy input policy to use - * @param readahead readahead/buffer size - * @return the stream, wrapping an S3a one - * @throws IOException IO problems - */ - private FSDataInputStream openDataFile(S3AFileSystem fs, - Path path, - S3AInputPolicy inputPolicy, - long readahead) throws IOException { - int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE, - DEFAULT_READ_BUFFER_SIZE); - S3AInputPolicy policy = fs.getInputPolicy(); - fs.setInputPolicy(inputPolicy); - try { - FSDataInputStream stream = fs.open(path, bufferSize); - if (readahead >= 0) { - stream.setReadahead(readahead); - } - streamStatistics = getInputStreamStatistics(stream); - return stream; - } finally { - fs.setInputPolicy(policy); - } - } - - /** - * Assert that the stream was only ever opened once. - */ - protected void assertStreamOpenedExactlyOnce() { - assertOpenOperationCount(1); - } - - /** - * Make an assertion count about the number of open operations. - * @param expected the expected number - */ - private void assertOpenOperationCount(long expected) { - assertEquals("open operations in\n" + in, - expected, streamStatistics.openOperations); - } - - /** - * Log how long an IOP took, by dividing the total time by the - * count of operations, printing in a human-readable form. - * @param operation operation being measured - * @param timer timing data - * @param count IOP count. - */ - protected void logTimePerIOP(String operation, - NanoTimer timer, - long count) { - LOG.info("Time per {}: {} nS", - operation, toHuman(timer.duration() / count)); - } - - @Test - public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable { - requireCSVTestData(); - int blockSize = _1MB; - describe("Open the test file %s and read it in blocks of size %d", - testData, blockSize); - long len = testDataStatus.getLen(); - in = openTestFile(); - byte[] block = new byte[blockSize]; - NanoTimer timer2 = new NanoTimer(); - long count = 0; - // implicitly rounding down here - long blockCount = len / blockSize; - for (long i = 0; i < blockCount; i++) { - int offset = 0; - int remaining = blockSize; - NanoTimer blockTimer = new NanoTimer(); - int reads = 0; - while (remaining > 0) { - int bytesRead = in.read(block, offset, remaining); - reads ++; - if (bytesRead == 1) { - break; - } - remaining -= bytesRead; - offset += bytesRead; - count += bytesRead; - } - blockTimer.end("Reading block %d in %d reads", i, reads); - } - timer2.end("Time to read %d bytes in %d blocks", len, blockCount ); - bandwidth(timer2, count); - logStreamStatistics(); - } - - @Test - public void testLazySeekEnabled() throws Throwable { - describe("Verify that seeks do not trigger any IO"); - in = openTestFile(); - long len = testDataStatus.getLen(); - NanoTimer timer = new NanoTimer(); - long blockCount = len / BLOCK_SIZE; - for (long i = 0; i < blockCount; i++) { - in.seek(in.getPos() + BLOCK_SIZE - 1); - } - in.seek(0); - blockCount++; - timer.end("Time to execute %d seeks", blockCount); - logTimePerIOP("seek()", timer, blockCount); - logStreamStatistics(); - assertOpenOperationCount(0); - assertEquals("bytes read", 0, streamStatistics.bytesRead); - } - - @Test - public void testReadaheadOutOfRange() throws Throwable { - try { - in = openTestFile(); - in.setReadahead(-1L); - fail("Stream should have rejected the request "+ in); - } catch (IllegalArgumentException e) { - // expected - } - } - - @Test - public void testReadWithNormalPolicy() throws Throwable { - describe("Read big blocks with a big readahead"); - executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2, - S3AInputPolicy.Normal); - assertStreamOpenedExactlyOnce(); - } - - @Test - public void testDecompressionSequential128K() throws Throwable { - describe("Decompress with a 128K readahead"); - executeDecompression(128 * 1024, S3AInputPolicy.Sequential); - assertStreamOpenedExactlyOnce(); - } - - /** - * Execute a decompression + line read with the given input policy. - * @param readahead byte readahead - * @param inputPolicy read policy - * @throws IOException IO Problems - */ - private void executeDecompression(long readahead, - S3AInputPolicy inputPolicy) throws IOException { - CompressionCodecFactory factory - = new CompressionCodecFactory(getConf()); - CompressionCodec codec = factory.getCodec(testData); - long bytesRead = 0; - int lines = 0; - - FSDataInputStream objectIn = openTestFile(inputPolicy, readahead); - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - try (LineReader lineReader = new LineReader( - codec.createInputStream(objectIn), getConf())) { - Text line = new Text(); - int read; - while ((read = lineReader.readLine(line)) > 0) { - bytesRead += read; - lines++; - } - } catch (EOFException eof) { - // done - } - timer.end("Time to read %d lines [%d bytes expanded, %d raw]" + - " with readahead = %d", - lines, - bytesRead, - testDataStatus.getLen(), - readahead); - logTimePerIOP("line read", timer, lines); - logStreamStatistics(); - } - - private void logStreamStatistics() { - LOG.info(String.format("Stream Statistics%n{}"), streamStatistics); - } - - /** - * Execute a seek+read sequence. - * @param blockSize block size for seeks - * @param readahead what the readahead value of the stream should be - * @throws IOException IO problems - */ - protected void executeSeekReadSequence(long blockSize, - long readahead, - S3AInputPolicy policy) throws IOException { - in = openTestFile(policy, readahead); - long len = testDataStatus.getLen(); - NanoTimer timer = new NanoTimer(); - long blockCount = len / blockSize; - LOG.info("Reading {} blocks, readahead = {}", - blockCount, readahead); - for (long i = 0; i < blockCount; i++) { - in.seek(in.getPos() + blockSize - 1); - // this is the read - assertTrue(in.read() >= 0); - } - timer.end("Time to execute %d seeks of distance %d with readahead = %d", - blockCount, - blockSize, - readahead); - logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount); - LOG.info("Effective bandwidth {} MB/S", - timer.bandwidthDescription(streamStatistics.bytesRead - - streamStatistics.bytesSkippedOnSeek)); - logStreamStatistics(); - } - - public static final int _4K = 4 * 1024; - public static final int _8K = 8 * 1024; - public static final int _16K = 16 * 1024; - public static final int _32K = 32 * 1024; - public static final int _64K = 64 * 1024; - public static final int _128K = 128 * 1024; - public static final int _256K = 256 * 1024; - public static final int _1MB = 1024 * 1024; - public static final int _2MB = 2 * _1MB; - public static final int _10MB = _1MB * 10; - public static final int _5MB = _1MB * 5; - - private static final int[][] RANDOM_IO_SEQUENCE = { - {_2MB, _128K}, - {_128K, _128K}, - {_5MB, _64K}, - {_1MB, _1MB}, - }; - - @Test - public void testRandomIORandomPolicy() throws Throwable { - executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length); - assertEquals("streams aborted in " + streamStatistics, - 0, streamStatistics.aborted); - } - - @Test - public void testRandomIONormalPolicy() throws Throwable { - long expectedOpenCount = RANDOM_IO_SEQUENCE.length; - executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount); - assertEquals("streams aborted in " + streamStatistics, - 4, streamStatistics.aborted); - } - - /** - * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by - * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used - * in the timing too - * @param policy read policy - * @param expectedOpenCount expected number of stream openings - * @throws IOException IO problems - * @return the timer - */ - private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy, - long expectedOpenCount) - throws IOException { - describe("Random IO with policy \"%s\"", policy); - byte[] buffer = new byte[_1MB]; - long totalBytesRead = 0; - - in = openTestFile(policy, 0); - ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); - for (int[] action : RANDOM_IO_SEQUENCE) { - int position = action[0]; - int range = action[1]; - in.readFully(position, buffer, 0, range); - totalBytesRead += range; - } - int reads = RANDOM_IO_SEQUENCE.length; - timer.end("Time to execute %d reads of total size %d bytes", - reads, - totalBytesRead); - in.close(); - assertOpenOperationCount(expectedOpenCount); - logTimePerIOP("byte read", timer, totalBytesRead); - LOG.info("Effective bandwidth {} MB/S", - timer.bandwidthDescription(streamStatistics.bytesRead - - streamStatistics.bytesSkippedOnSeek)); - logStreamStatistics(); - return timer; - } - - S3AInputStream getS3aStream() { - return (S3AInputStream) in.getWrappedStream(); - } - - @Test - public void testRandomReadOverBuffer() throws Throwable { - describe("read over a buffer, making sure that the requests" + - " spans readahead ranges"); - int datasetLen = _32K; - Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin"); - byte[] sourceData = dataset(datasetLen, 0, 64); - // relies on the field 'fs' referring to the R/W FS - writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true); - byte[] buffer = new byte[datasetLen]; - int readahead = _8K; - int halfReadahead = _4K; - in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead); - - LOG.info("Starting initial reads"); - S3AInputStream s3aStream = getS3aStream(); - assertEquals(readahead, s3aStream.getReadahead()); - byte[] oneByte = new byte[1]; - assertEquals(1, in.read(0, oneByte, 0, 1)); - // make some assertions about the current state - assertEquals("remaining in\n" + in, - readahead - 1, s3aStream.remainingInCurrentRequest()); - assertEquals("range start in\n" + in, - 0, s3aStream.getContentRangeStart()); - assertEquals("range finish in\n" + in, - readahead, s3aStream.getContentRangeFinish()); - - assertStreamOpenedExactlyOnce(); - - describe("Starting sequence of positioned read calls over\n%s", in); - NanoTimer readTimer = new NanoTimer(); - int currentPos = halfReadahead; - int offset = currentPos; - int bytesRead = 0; - int readOps = 0; - - // make multiple read() calls - while (bytesRead < halfReadahead) { - int length = buffer.length - offset; - int read = in.read(currentPos, buffer, offset, length); - bytesRead += read; - offset += read; - readOps++; - assertEquals("open operations on request #" + readOps - + " after reading " + bytesRead - + " current position in stream " + currentPos - + " in\n" + fs - + "\n " + in, - 1, streamStatistics.openOperations); - for (int i = currentPos; i < currentPos + read; i++) { - assertEquals("Wrong value from byte " + i, - sourceData[i], buffer[i]); - } - currentPos += read; - } - assertStreamOpenedExactlyOnce(); - // assert at the end of the original block - assertEquals(readahead, currentPos); - readTimer.end("read %d in %d operations", bytesRead, readOps); - bandwidth(readTimer, bytesRead); - LOG.info("Time per byte(): {} nS", - toHuman(readTimer.nanosPerOperation(bytesRead))); - LOG.info("Time per read(): {} nS", - toHuman(readTimer.nanosPerOperation(readOps))); - - describe("read last byte"); - // read one more - int read = in.read(currentPos, buffer, bytesRead, 1); - assertTrue("-1 from last read", read >= 0); - assertOpenOperationCount(2); - assertEquals("Wrong value from read ", sourceData[currentPos], - (int) buffer[currentPos]); - currentPos++; - - - // now scan all the way to the end of the file, using single byte read() - // calls - describe("read() to EOF over \n%s", in); - long readCount = 0; - NanoTimer timer = new NanoTimer(); - LOG.info("seeking"); - in.seek(currentPos); - LOG.info("reading"); - while(currentPos < datasetLen) { - int r = in.read(); - assertTrue("Negative read() at position " + currentPos + " in\n" + in, - r >= 0); - buffer[currentPos] = (byte)r; - assertEquals("Wrong value from read from\n" + in, - sourceData[currentPos], r); - currentPos++; - readCount++; - } - timer.end("read %d bytes", readCount); - bandwidth(timer, readCount); - LOG.info("Time per read(): {} nS", - toHuman(timer.nanosPerOperation(readCount))); - - assertEquals("last read in " + in, -1, in.read()); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java new file mode 100644 index 0000000..ca57da6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3A.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.s3a.yarn; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.Path; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.EnumSet; +import org.apache.hadoop.fs.s3a.S3ATestUtils; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * S3A tests through the {@link FileContext} API. + */ +public class ITestS3A { + private FileContext fc; + + @Rule + public final Timeout testTimeout = new Timeout(90000); + + @Before + public void setUp() throws Exception { + Configuration conf = new Configuration(); + fc = S3ATestUtils.createTestFileContext(conf); + } + + @After + public void tearDown() throws Exception { + if (fc != null) { + fc.delete(getTestPath(), true); + } + } + + protected Path getTestPath() { + return new Path("/tests3afc"); + } + + @Test + public void testS3AStatus() throws Exception { + FsStatus fsStatus = fc.getFsStatus(null); + assertNotNull(fsStatus); + assertTrue("Used capacity should be positive: " + fsStatus.getUsed(), + fsStatus.getUsed() >= 0); + assertTrue("Remaining capacity should be positive: " + fsStatus + .getRemaining(), + fsStatus.getRemaining() >= 0); + assertTrue("Capacity should be positive: " + fsStatus.getCapacity(), + fsStatus.getCapacity() >= 0); + } + + @Test + public void testS3ACreateFileInSubDir() throws Exception { + Path dirPath = getTestPath(); + fc.mkdir(dirPath, FileContext.DIR_DEFAULT_PERM, true); + Path filePath = new Path(dirPath, "file"); + try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag + .CREATE))) { + file.write(666); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java new file mode 100644 index 0000000..772d8c7 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/ITestS3AMiniYarnCluster.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.fs.s3a.yarn; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.WordCount; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; + +import org.junit.After; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.assertEquals; + +/** + * Tests that S3A is usable through a YARN application. + */ +public class ITestS3AMiniYarnCluster { + + private final Configuration conf = new YarnConfiguration(); + private S3AFileSystem fs; + private MiniYARNCluster yarnCluster; + private final String rootPath = "/tests/MiniClusterWordCount/"; + + @Before + public void beforeTest() throws IOException { + fs = S3ATestUtils.createTestFileSystem(conf); + fs.mkdirs(new Path(rootPath + "input/")); + + yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName + 1, // number of node managers + 1, // number of local log dirs per node manager + 1); // number of hdfs dirs per node manager + yarnCluster.init(conf); + yarnCluster.start(); + } + + @After + public void afterTest() throws IOException { + fs.delete(new Path(rootPath), true); + yarnCluster.stop(); + } + + @Test + public void testWithMiniCluster() throws Exception { + Path input = new Path(rootPath + "input/in.txt"); + input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path output = new Path(rootPath + "output/"); + output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + writeStringToFile(input, "first line\nsecond line\nthird line"); + + Job job = Job.getInstance(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(WordCount.TokenizerMapper.class); + job.setCombinerClass(WordCount.IntSumReducer.class); + job.setReducerClass(WordCount.IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + int exitCode = (job.waitForCompletion(true) ? 0 : 1); + assertEquals("Returned error code.", 0, exitCode); + + assertTrue(fs.exists(new Path(output, "_SUCCESS"))); + String outputAsStr = readStringFromFile(new Path(output, "part-r-00000")); + Map<String, Integer> resAsMap = getResultAsMap(outputAsStr); + + assertEquals(4, resAsMap.size()); + assertEquals(1, (int) resAsMap.get("first")); + assertEquals(1, (int) resAsMap.get("second")); + assertEquals(1, (int) resAsMap.get("third")); + assertEquals(3, (int) resAsMap.get("line")); + } + + /** + * helper method. + */ + private Map<String, Integer> getResultAsMap(String outputAsStr) + throws IOException { + Map<String, Integer> result = new HashMap<>(); + for (String line : outputAsStr.split("\n")) { + String[] tokens = line.split("\t"); + result.put(tokens[0], Integer.parseInt(tokens[1])); + } + return result; + } + + /** + * helper method. + */ + private void writeStringToFile(Path path, String string) throws IOException { + FileContext fc = S3ATestUtils.createTestFileContext(conf); + try (FSDataOutputStream file = fc.create(path, + EnumSet.of(CreateFlag.CREATE))) { + file.write(string.getBytes()); + } + } + + /** + * helper method. + */ + private String readStringFromFile(Path path) { + try (FSDataInputStream in = fs.open(path)) { + long bytesLen = fs.getFileStatus(path).getLen(); + byte[] buffer = new byte[(int) bytesLen]; + IOUtils.readFully(in, buffer, 0, buffer.length); + return new String(buffer); + } catch (IOException e) { + throw new RuntimeException("Failed to read from [" + path + "]", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3A.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3A.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3A.java deleted file mode 100644 index a22dd28..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3A.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs.s3a.yarn; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.Path; - -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -import java.util.EnumSet; -import org.apache.hadoop.fs.s3a.S3ATestUtils; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class TestS3A { - private FileContext fc; - - @Rule - public final Timeout testTimeout = new Timeout(90000); - - @Before - public void setUp() throws Exception { - Configuration conf = new Configuration(); - fc = S3ATestUtils.createTestFileContext(conf); - } - - @After - public void tearDown() throws Exception { - if (fc != null) { - fc.delete(getTestPath(), true); - } - } - - protected Path getTestPath() { - return new Path("/tests3afc"); - } - - @Test - public void testS3AStatus() throws Exception { - FsStatus fsStatus = fc.getFsStatus(null); - assertNotNull(fsStatus); - assertTrue("Used capacity should be positive: " + fsStatus.getUsed(), - fsStatus.getUsed() >= 0); - assertTrue("Remaining capacity should be positive: " + fsStatus - .getRemaining(), - fsStatus.getRemaining() >= 0); - assertTrue("Capacity should be positive: " + fsStatus.getCapacity(), - fsStatus.getCapacity() >= 0); - } - - @Test - public void testS3ACreateFileInSubDir() throws Exception { - Path dirPath = getTestPath(); - fc.mkdir(dirPath,FileContext.DIR_DEFAULT_PERM,true); - Path filePath = new Path(dirPath, "file"); - try (FSDataOutputStream file = fc.create(filePath, EnumSet.of(CreateFlag - .CREATE))) { - file.write(666); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3AMiniYarnCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3AMiniYarnCluster.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3AMiniYarnCluster.java deleted file mode 100644 index 990d79f..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/yarn/TestS3AMiniYarnCluster.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.fs.s3a.yarn; - -import java.io.IOException; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.examples.WordCount; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.s3a.S3ATestUtils; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.MiniYARNCluster; - -import org.junit.After; -import static org.junit.Assert.assertTrue; -import org.junit.Before; -import org.junit.Test; -import static org.junit.Assert.assertEquals; - -public class TestS3AMiniYarnCluster { - - private final Configuration conf = new YarnConfiguration(); - private S3AFileSystem fs; - private MiniYARNCluster yarnCluster; - private final String rootPath = "/tests/MiniClusterWordCount/"; - - @Before - public void beforeTest() throws IOException { - fs = S3ATestUtils.createTestFileSystem(conf); - fs.mkdirs(new Path(rootPath + "input/")); - - yarnCluster = new MiniYARNCluster("MiniClusterWordCount", // testName - 1, // number of node managers - 1, // number of local log dirs per node manager - 1); // number of hdfs dirs per node manager - yarnCluster.init(conf); - yarnCluster.start(); - } - - @After - public void afterTest() throws IOException { - fs.delete(new Path(rootPath), true); - yarnCluster.stop(); - } - - @Test - public void testWithMiniCluster() throws Exception { - Path input = new Path(rootPath + "input/in.txt"); - input = input.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - Path output = new Path(rootPath + "output/"); - output = output.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - - writeStringToFile(input, "first line\nsecond line\nthird line"); - - Job job = Job.getInstance(conf, "word count"); - job.setJarByClass(WordCount.class); - job.setMapperClass(WordCount.TokenizerMapper.class); - job.setCombinerClass(WordCount.IntSumReducer.class); - job.setReducerClass(WordCount.IntSumReducer.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - FileInputFormat.addInputPath(job, input); - FileOutputFormat.setOutputPath(job, output); - - int exitCode = (job.waitForCompletion(true) ? 0 : 1); - assertEquals("Returned error code.", 0, exitCode); - - assertTrue(fs.exists(new Path(output, "_SUCCESS"))); - String outputAsStr = readStringFromFile(new Path(output, "part-r-00000")); - Map<String, Integer> resAsMap = getResultAsMap(outputAsStr); - - assertEquals(4, resAsMap.size()); - assertEquals(1, (int) resAsMap.get("first")); - assertEquals(1, (int) resAsMap.get("second")); - assertEquals(1, (int) resAsMap.get("third")); - assertEquals(3, (int) resAsMap.get("line")); - } - - /** - * helper method - */ - private Map<String, Integer> getResultAsMap(String outputAsStr) throws IOException { - Map<String, Integer> result = new HashMap<>(); - for (String line : outputAsStr.split("\n")) { - String[] tokens = line.split("\t"); - result.put(tokens[0], Integer.parseInt(tokens[1])); - } - return result; - } - - /** - * helper method - */ - private void writeStringToFile(Path path, String string) throws IOException { - FileContext fc = S3ATestUtils.createTestFileContext(conf); - try (FSDataOutputStream file = fc.create(path, - EnumSet.of(CreateFlag.CREATE))) { - file.write(string.getBytes()); - } - } - - /** - * helper method - */ - private String readStringFromFile(Path path) { - try (FSDataInputStream in = fs.open(path)) { - long bytesLen = fs.getFileStatus(path).getLen(); - byte[] buffer = new byte[(int) bytesLen]; - IOUtils.readFully(in, buffer, 0, buffer.length); - return new String(buffer); - } catch (IOException e) { - throw new RuntimeException("Failed to read from [" + path + "]", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestInMemoryNativeS3FileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestInMemoryNativeS3FileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestInMemoryNativeS3FileSystemContract.java new file mode 100644 index 0000000..adbf950 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestInMemoryNativeS3FileSystemContract.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import java.io.IOException; + +/** + * S3N basic contract tests through mock in-memory S3 implementation. + */ +public class ITestInMemoryNativeS3FileSystemContract + extends NativeS3FileSystemContractBaseTest { + + @Override + NativeFileSystemStore getNativeFileSystemStore() throws IOException { + return new InMemoryNativeFileSystemStore(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeFileSystemStore.java new file mode 100644 index 0000000..cfe622c --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeFileSystemStore.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +import org.junit.Before; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.security.DigestInputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +/** + * S3N tests through live S3 service. + */ +public class ITestJets3tNativeFileSystemStore { + private Configuration conf; + private Jets3tNativeFileSystemStore store; + private NativeS3FileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new Configuration(); + store = new Jets3tNativeFileSystemStore(); + fs = new NativeS3FileSystem(store); + conf.setBoolean("fs.s3n.multipart.uploads.enabled", true); + conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024); + fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf); + } + + @After + public void tearDown() throws Exception { + try { + store.purge("test"); + } catch (Exception e) {} + } + + @BeforeClass + public static void checkSettings() throws Exception { + Configuration conf = new Configuration(); + assumeNotNull(conf.get("fs.s3n.awsAccessKeyId")); + assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey")); + assumeNotNull(conf.get("test.fs.s3n.name")); + } + + protected void writeRenameReadCompare(Path path, long len) + throws IOException, NoSuchAlgorithmException { + // If len > fs.s3n.multipart.uploads.block.size, + // we'll use a multipart upload copy + MessageDigest digest = MessageDigest.getInstance("MD5"); + OutputStream out = new BufferedOutputStream( + new DigestOutputStream(fs.create(path, false), digest)); + for (long i = 0; i < len; i++) { + out.write('Q'); + } + out.flush(); + out.close(); + + assertTrue("Exists", fs.exists(path)); + + // Depending on if this file is over 5 GB or not, + // rename will cause a multipart upload copy + Path copyPath = path.suffix(".copy"); + fs.rename(path, copyPath); + + assertTrue("Copy exists", fs.exists(copyPath)); + + // Download file from S3 and compare the digest against the original + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + InputStream in = new BufferedInputStream( + new DigestInputStream(fs.open(copyPath), digest2)); + long copyLen = 0; + while (in.read() != -1) { + copyLen++; + } + in.close(); + + assertEquals("Copy length matches original", len, copyLen); + assertArrayEquals("Digests match", digest.digest(), digest2.digest()); + } + + @Test + public void testSmallUpload() throws IOException, NoSuchAlgorithmException { + // Regular upload, regular copy + writeRenameReadCompare(new Path("/test/small"), 16384); + } + + @Test + public void testMediumUpload() throws IOException, NoSuchAlgorithmException { + // Multipart upload, regular copy + writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB + } + + /* + Enable Multipart upload to run this test + @Test + public void testExtraLargeUpload() + throws IOException, NoSuchAlgorithmException { + // Multipart upload, multipart copy + writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte + } + */ +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeS3FileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeS3FileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeS3FileSystemContract.java new file mode 100644 index 0000000..e51eaf6 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/ITestJets3tNativeS3FileSystemContract.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3native; + +import java.io.IOException; + +/** + * S3N basic contract tests through live S3 service. + */ +public class ITestJets3tNativeS3FileSystemContract + extends NativeS3FileSystemContractBaseTest { + + @Override + NativeFileSystemStore getNativeFileSystemStore() throws IOException { + return new Jets3tNativeFileSystemStore(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestInMemoryNativeS3FileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestInMemoryNativeS3FileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestInMemoryNativeS3FileSystemContract.java deleted file mode 100644 index 664d39e..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestInMemoryNativeS3FileSystemContract.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3native; - -import java.io.IOException; - -public class TestInMemoryNativeS3FileSystemContract - extends NativeS3FileSystemContractBaseTest { - - @Override - NativeFileSystemStore getNativeFileSystemStore() throws IOException { - return new InMemoryNativeFileSystemStore(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java deleted file mode 100644 index dbd476e..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeFileSystemStore.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3native; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import static org.junit.Assert.*; -import static org.junit.Assume.*; - -import org.junit.Before; -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.security.DigestInputStream; -import java.security.DigestOutputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - - -public class TestJets3tNativeFileSystemStore { - private Configuration conf; - private Jets3tNativeFileSystemStore store; - private NativeS3FileSystem fs; - - @Before - public void setUp() throws Exception { - conf = new Configuration(); - store = new Jets3tNativeFileSystemStore(); - fs = new NativeS3FileSystem(store); - conf.setBoolean("fs.s3n.multipart.uploads.enabled", true); - conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024); - fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf); - } - - @After - public void tearDown() throws Exception { - try { - store.purge("test"); - } catch (Exception e) {} - } - - @BeforeClass - public static void checkSettings() throws Exception { - Configuration conf = new Configuration(); - assumeNotNull(conf.get("fs.s3n.awsAccessKeyId")); - assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey")); - assumeNotNull(conf.get("test.fs.s3n.name")); - } - - protected void writeRenameReadCompare(Path path, long len) - throws IOException, NoSuchAlgorithmException { - // If len > fs.s3n.multipart.uploads.block.size, - // we'll use a multipart upload copy - MessageDigest digest = MessageDigest.getInstance("MD5"); - OutputStream out = new BufferedOutputStream( - new DigestOutputStream(fs.create(path, false), digest)); - for (long i = 0; i < len; i++) { - out.write('Q'); - } - out.flush(); - out.close(); - - assertTrue("Exists", fs.exists(path)); - - // Depending on if this file is over 5 GB or not, - // rename will cause a multipart upload copy - Path copyPath = path.suffix(".copy"); - fs.rename(path, copyPath); - - assertTrue("Copy exists", fs.exists(copyPath)); - - // Download file from S3 and compare the digest against the original - MessageDigest digest2 = MessageDigest.getInstance("MD5"); - InputStream in = new BufferedInputStream( - new DigestInputStream(fs.open(copyPath), digest2)); - long copyLen = 0; - while (in.read() != -1) {copyLen++;} - in.close(); - - assertEquals("Copy length matches original", len, copyLen); - assertArrayEquals("Digests match", digest.digest(), digest2.digest()); - } - - @Test - public void testSmallUpload() throws IOException, NoSuchAlgorithmException { - // Regular upload, regular copy - writeRenameReadCompare(new Path("/test/small"), 16384); - } - - @Test - public void testMediumUpload() throws IOException, NoSuchAlgorithmException { - // Multipart upload, regular copy - writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB - } - - /* - Enable Multipart upload to run this test - @Test - public void testExtraLargeUpload() - throws IOException, NoSuchAlgorithmException { - // Multipart upload, multipart copy - writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte - } - */ -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cda68de9/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeS3FileSystemContract.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeS3FileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeS3FileSystemContract.java deleted file mode 100644 index 42d6f06..0000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3native/TestJets3tNativeS3FileSystemContract.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3native; - -import java.io.IOException; - -public class TestJets3tNativeS3FileSystemContract - extends NativeS3FileSystemContractBaseTest { - - @Override - NativeFileSystemStore getNativeFileSystemStore() throws IOException { - return new Jets3tNativeFileSystemStore(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org