http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java new file mode 100644 index 0000000..a60d084 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.util.Progressable; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +/** + * Scale test which creates a huge file. + * + * <b>Important:</b> the order in which these tests execute is fixed to + * alphabetical order. Test cases are numbered {@code test_123_} to impose + * an ordering based on the numbers. + * + * Having this ordering allows the tests to assume that the huge file + * exists. Even so: they should all have a {@link #assumeHugeFileExists()} + * check at the start, in case an individual test is executed. + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase { + private static final Logger LOG = LoggerFactory.getLogger( + AbstractSTestS3AHugeFiles.class); + public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB; + public static final String DEFAULT_PARTITION_SIZE = "8M"; + private Path scaleTestDir; + private Path hugefile; + private Path hugefileRenamed; + + private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE; + private int partitionSize; + + @Override + public void setUp() throws Exception { + super.setUp(); + + final Path testPath = getTestPath(); + scaleTestDir = new Path(testPath, "scale"); + hugefile = new Path(scaleTestDir, "hugefile"); + hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed"); + } + + @Override + public void tearDown() throws Exception { + // do nothing. Specifically: do not delete the test dir + } + + /** + * Note that this can get called before test setup. + * @return the configuration to use. + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + partitionSize = (int)getTestPropertyBytes(conf, + KEY_HUGE_PARTITION_SIZE, + DEFAULT_PARTITION_SIZE); + assertTrue("Partition size too small: " + partitionSize, + partitionSize > MULTIPART_MIN_SIZE); + conf.setLong(SOCKET_SEND_BUFFER, _1MB); + conf.setLong(SOCKET_RECV_BUFFER, _1MB); + conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize); + conf.setInt(MULTIPART_SIZE, partitionSize); + conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate"); + conf.setBoolean(FAST_UPLOAD, true); + conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName()); + return conf; + } + + /** + * The name of the buffering mechanism to use. + * @return a buffering mechanism + */ + protected abstract String getBlockOutputBufferName(); + + @Test + public void test_010_CreateHugeFile() throws IOException { + assertFalse("Please run this test sequentially to avoid timeouts" + + " and bandwidth problems", isParallelExecution()); + long filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE, + DEFAULT_HUGE_FILESIZE); + long filesizeMB = filesize / _1MB; + + // clean up from any previous attempts + deleteHugeFile(); + + describe("Creating file %s of size %d MB" + + " with partition size %d buffered by %s", + hugefile, filesizeMB, partitionSize, getBlockOutputBufferName()); + + // now do a check of available upload time, with a pessimistic bandwidth + // (that of remote upload tests). If the test times out then not only is + // the test outcome lost, as the follow-on tests continue, they will + // overlap with the ongoing upload test, for much confusion. + int timeout = getTestTimeoutSeconds(); + // assume 1 MB/s upload bandwidth + int bandwidth = _1MB; + long uploadTime = filesize / bandwidth; + assertTrue(String.format("Timeout set in %s seconds is too low;" + + " estimating upload time of %d seconds at 1 MB/s." + + " Rerun tests with -D%s=%d", + timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2), + uploadTime < timeout); + assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize + + " is not a multiple of " + uploadBlockSize, + 0, filesize % uploadBlockSize); + + byte[] data = new byte[uploadBlockSize]; + for (int i = 0; i < uploadBlockSize; i++) { + data[i] = (byte) (i % 256); + } + + long blocks = filesize / uploadBlockSize; + long blocksPerMB = _1MB / uploadBlockSize; + + // perform the upload. + // there's lots of logging here, so that a tail -f on the output log + // can give a view of what is happening. + StorageStatistics storageStatistics = fs.getStorageStatistics(); + String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol(); + String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol(); + Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE; + Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING; + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + + long blocksPer10MB = blocksPerMB * 10; + ProgressCallback progress = new ProgressCallback(timer); + try (FSDataOutputStream out = fs.create(hugefile, + true, + uploadBlockSize, + progress)) { + + for (long block = 1; block <= blocks; block++) { + out.write(data); + long written = block * uploadBlockSize; + // every 10 MB and on file upload @ 100%, print some stats + if (block % blocksPer10MB == 0 || written == filesize) { + long percentage = written * 100 / filesize; + double elapsedTime = timer.elapsedTime() / 1.0e9; + double writtenMB = 1.0 * written / _1MB; + LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" + + " PUT %d bytes (%d pending) in %d operations (%d active);" + + " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s", + percentage, + writtenMB, + filesizeMB, + storageStatistics.getLong(putBytes), + gaugeValue(putBytesPending), + storageStatistics.getLong(putRequests), + gaugeValue(putRequestsActive), + elapsedTime, + writtenMB / elapsedTime)); + } + } + // now close the file + LOG.info("Closing file and completing write operation"); + ContractTestUtils.NanoTimer closeTimer + = new ContractTestUtils.NanoTimer(); + out.close(); + closeTimer.end("time to close() output stream"); + } + + timer.end("time to write %d MB in blocks of %d", + filesizeMB, uploadBlockSize); + logFSState(); + bandwidth(timer, filesize); + long putRequestCount = storageStatistics.getLong(putRequests); + Long putByteCount = storageStatistics.getLong(putBytes); + LOG.info("PUT {} bytes in {} operations; {} MB/operation", + putByteCount, putRequestCount, + putByteCount / (putRequestCount * _1MB)); + LOG.info("Time per PUT {} nS", + toHuman(timer.nanosPerOperation(putRequestCount))); + assertEquals("active put requests in \n" + fs, + 0, gaugeValue(putRequestsActive)); + ContractTestUtils.assertPathExists(fs, "Huge file", hugefile); + S3AFileStatus status = fs.getFileStatus(hugefile); + ContractTestUtils.assertIsFile(hugefile, status); + assertEquals("File size in " + status, filesize, status.getLen()); + progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize); + } + + /** + * Progress callback from AWS. Likely to come in on a different thread. + */ + private final class ProgressCallback implements Progressable, + ProgressListener { + private AtomicLong bytesTransferred = new AtomicLong(0); + private AtomicInteger failures = new AtomicInteger(0); + private final ContractTestUtils.NanoTimer timer; + + private ProgressCallback(NanoTimer timer) { + this.timer = timer; + } + + @Override + public void progress() { + } + + @Override + public void progressChanged(ProgressEvent progressEvent) { + ProgressEventType eventType = progressEvent.getEventType(); + if (eventType.isByteCountEvent()) { + bytesTransferred.addAndGet(progressEvent.getBytesTransferred()); + } + switch (eventType) { + case TRANSFER_PART_FAILED_EVENT: + // failure + failures.incrementAndGet(); + LOG.warn("Transfer failure"); + break; + case TRANSFER_PART_COMPLETED_EVENT: + // completion + long elapsedTime = timer.elapsedTime(); + double elapsedTimeS = elapsedTime / 1.0e9; + long written = bytesTransferred.get(); + long writtenMB = written / _1MB; + LOG.info(String.format( + "Event %s; total uploaded=%d MB in %.1fs;" + + " effective upload bandwidth = %.2f MB/s", + progressEvent, + writtenMB, elapsedTimeS, writtenMB / elapsedTimeS)); + break; + default: + if (eventType.isByteCountEvent()) { + LOG.debug("Event {}", progressEvent); + } else { + LOG.info("Event {}", progressEvent); + } + break; + } + } + + @Override + public String toString() { + String sb = "ProgressCallback{" + + "bytesTransferred=" + bytesTransferred + + ", failures=" + failures + + '}'; + return sb; + } + + private void verifyNoFailures(String operation) { + assertEquals("Failures in " + operation +": " + this, 0, failures.get()); + } + } + + void assumeHugeFileExists() throws IOException { + ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile); + ContractTestUtils.assertIsFile(fs, hugefile); + } + + private void logFSState() { + LOG.info("File System state after operation:\n{}", fs); + } + + @Test + public void test_040_PositionedReadHugeFile() throws Throwable { + assumeHugeFileExists(); + final String encryption = getConf().getTrimmed( + SERVER_SIDE_ENCRYPTION_ALGORITHM); + boolean encrypted = encryption != null; + if (encrypted) { + LOG.info("File is encrypted with algorithm {}", encryption); + } + String filetype = encrypted ? "encrypted file" : "file"; + describe("Positioned reads of %s %s", filetype, hugefile); + S3AFileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + int ops = 0; + final int bufferSize = 8192; + byte[] buffer = new byte[bufferSize]; + long eof = filesize - 1; + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF; + try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { + readAtByte0 = new ContractTestUtils.NanoTimer(); + in.readFully(0, buffer); + readAtByte0.end("time to read data at start of file"); + ops++; + + readAtEOF = new ContractTestUtils.NanoTimer(); + in.readFully(eof - bufferSize, buffer); + readAtEOF.end("time to read data at end of file"); + ops++; + + readAtByte0Again = new ContractTestUtils.NanoTimer(); + in.readFully(0, buffer); + readAtByte0Again.end("time to read data at start of file again"); + ops++; + LOG.info("Final stream state: {}", in); + } + long mb = Math.max(filesize / _1MB, 1); + + logFSState(); + timer.end("time to performed positioned reads of %s of %d MB ", + filetype, mb); + LOG.info("Time per positioned read = {} nS", + toHuman(timer.nanosPerOperation(ops))); + } + + @Test + public void test_050_readHugeFile() throws Throwable { + assumeHugeFileExists(); + describe("Reading %s", hugefile); + S3AFileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + long blocks = filesize / uploadBlockSize; + byte[] data = new byte[uploadBlockSize]; + + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) { + for (long block = 0; block < blocks; block++) { + in.readFully(data); + } + LOG.info("Final stream state: {}", in); + } + + long mb = Math.max(filesize / _1MB, 1); + timer.end("time to read file of %d MB ", mb); + LOG.info("Time per MB to read = {} nS", + toHuman(timer.nanosPerOperation(mb))); + bandwidth(timer, filesize); + logFSState(); + } + + @Test + public void test_100_renameHugeFile() throws Throwable { + assumeHugeFileExists(); + describe("renaming %s to %s", hugefile, hugefileRenamed); + S3AFileStatus status = fs.getFileStatus(hugefile); + long filesize = status.getLen(); + fs.delete(hugefileRenamed, false); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + fs.rename(hugefile, hugefileRenamed); + long mb = Math.max(filesize / _1MB, 1); + timer.end("time to rename file of %d MB", mb); + LOG.info("Time per MB to rename = {} nS", + toHuman(timer.nanosPerOperation(mb))); + bandwidth(timer, filesize); + logFSState(); + S3AFileStatus destFileStatus = fs.getFileStatus(hugefileRenamed); + assertEquals(filesize, destFileStatus.getLen()); + + // rename back + ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); + fs.rename(hugefileRenamed, hugefile); + timer2.end("Renaming back"); + LOG.info("Time per MB to rename = {} nS", + toHuman(timer2.nanosPerOperation(mb))); + bandwidth(timer2, filesize); + } + + @Test + public void test_999_DeleteHugeFiles() throws IOException { + deleteHugeFile(); + ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer(); + + fs.delete(hugefileRenamed, false); + timer2.end("time to delete %s", hugefileRenamed); + ContractTestUtils.rm(fs, getTestPath(), true, true); + } + + protected void deleteHugeFile() throws IOException { + describe("Deleting %s", hugefile); + NanoTimer timer = new NanoTimer(); + fs.delete(hugefile, false); + timer.end("time to delete %s", hugefile); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java index 542dc12..ab431b8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java @@ -107,20 +107,9 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase { @Test public void testOpenCreate() throws IOException { - Path dir = new Path("/tests3a"); - ContractTestUtils.createAndVerifyFile(fs, dir, 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024); - ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024); - - - /* - Enable to test the multipart upload - try { - ContractTestUtils.createAndVerifyFile(fs, dir, - (long)6 * 1024 * 1024 * 1024); - } catch (IOException e) { - fail(e.getMessage()); - } - */ + final Path scaleTestDir = getTestPath(); + final Path srcDir = new Path(scaleTestDir, "opencreate"); + ContractTestUtils.createAndVerifyFile(fs, srcDir, 1024); + ContractTestUtils.createAndVerifyFile(fs, srcDir, 50 * 1024); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java new file mode 100644 index 0000000..d6f15c8 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.s3a.Constants; + +/** + * Use {@link Constants#FAST_UPLOAD_BUFFER_ARRAY} for buffering. + */ +public class ITestS3AHugeFilesArrayBlocks extends AbstractSTestS3AHugeFiles { + + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_ARRAY; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java new file mode 100644 index 0000000..b1323c4 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.s3a.Constants; + +import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER; + +/** + * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering. + */ +public class ITestS3AHugeFilesByteBufferBlocks + extends AbstractSTestS3AHugeFiles { + + protected String getBlockOutputBufferName() { + return FAST_UPLOAD_BYTEBUFFER; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java new file mode 100644 index 0000000..45eef24 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; + +/** + * Use classic output for writing things; tweaks the configuration to do + * this after it has been set up in the superclass. + * The generator test has been copied and re + */ +public class ITestS3AHugeFilesClassicOutput extends AbstractSTestS3AHugeFiles { + + @Override + protected Configuration createConfiguration() { + final Configuration conf = super.createConfiguration(); + conf.setBoolean(Constants.FAST_UPLOAD, false); + return conf; + } + + protected String getBlockOutputBufferName() { + return "classic"; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java new file mode 100644 index 0000000..2be5769 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.scale; + +import org.apache.hadoop.fs.s3a.Constants; + +/** + * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering. + */ +public class ITestS3AHugeFilesDiskBlocks extends AbstractSTestS3AHugeFiles { + + protected String getBlockOutputBufferName() { + return Constants.FAST_UPLOAD_BUFFER_DISK; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ecbf323/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index d861a16..af6d468 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -20,18 +20,18 @@ package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.S3AFileSystem; -import org.apache.hadoop.fs.Path; - import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3AInstrumentation; import org.apache.hadoop.fs.s3a.S3ATestConstants; -import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; @@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory; import java.io.InputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + /** * Base class for scale tests; here is where the common scale configuration * keys are defined. @@ -47,71 +49,18 @@ import java.io.InputStream; public class S3AScaleTestBase extends Assert implements S3ATestConstants { @Rule - public TestName methodName = new TestName(); + public final TestName methodName = new TestName(); @Rule - public Timeout testTimeout = new Timeout(30 * 60 * 1000); + public Timeout testTimeout = createTestTimeout(); - @BeforeClass - public static void nameThread() { + @Before + public void nameThread() { Thread.currentThread().setName("JUnit"); } - /** - * The number of operations to perform: {@value}. - */ - public static final String KEY_OPERATION_COUNT = - SCALE_TEST + "operation.count"; - - /** - * The number of directory operations to perform: {@value}. - */ - public static final String KEY_DIRECTORY_COUNT = - SCALE_TEST + "directory.count"; - - /** - * The readahead buffer: {@value}. - */ - public static final String KEY_READ_BUFFER_SIZE = - S3A_SCALE_TEST + "read.buffer.size"; - - public static final int DEFAULT_READ_BUFFER_SIZE = 16384; - - /** - * Key for a multi MB test file: {@value}. - */ - public static final String KEY_CSVTEST_FILE = - S3A_SCALE_TEST + "csvfile"; - /** - * Default path for the multi MB test file: {@value}. - */ - public static final String DEFAULT_CSVTEST_FILE - = "s3a://landsat-pds/scene_list.gz"; - - /** - * Endpoint for the S3 CSV/scale tests. This defaults to - * being us-east. - */ - public static final String KEY_CSVTEST_ENDPOINT = - S3A_SCALE_TEST + "csvfile.endpoint"; - - /** - * Endpoint for the S3 CSV/scale tests. This defaults to - * being us-east. - */ - public static final String DEFAULT_CSVTEST_ENDPOINT = - "s3.amazonaws.com"; - - /** - * The default number of operations to perform: {@value}. - */ - public static final long DEFAULT_OPERATION_COUNT = 2005; - - /** - * Default number of directories to create when performing - * directory performance/scale tests. - */ - public static final int DEFAULT_DIRECTORY_COUNT = 2; + public static final int _1KB = 1024; + public static final int _1MB = _1KB * _1KB; protected S3AFileSystem fs; @@ -120,6 +69,8 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants { private Configuration conf; + private boolean enabled; + /** * Configuration generator. May be overridden to inject * some custom options. @@ -137,11 +88,33 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants { return conf; } + /** + * Setup. This triggers creation of the configuration. + */ @Before public void setUp() throws Exception { - conf = createConfiguration(); + demandCreateConfiguration(); LOG.debug("Scale test operation count = {}", getOperationCount()); - fs = S3ATestUtils.createTestFileSystem(conf); + // multipart purges are disabled on the scale tests + fs = createTestFileSystem(conf, false); + // check for the test being enabled + enabled = getTestPropertyBool( + getConf(), + KEY_SCALE_TESTS_ENABLED, + DEFAULT_SCALE_TESTS_ENABLED); + Assume.assumeTrue("Scale test disabled: to enable set property " + + KEY_SCALE_TESTS_ENABLED, enabled); + } + + /** + * Create the configuration if it is not already set up. + * @return the configuration. + */ + private synchronized Configuration demandCreateConfiguration() { + if (conf == null) { + conf = createConfiguration(); + } + return conf; } @After @@ -160,7 +133,27 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants { } /** - * Describe a test in the logs + * Create the timeout for tests. Some large tests may need a larger value. + * @return the test timeout to use + */ + protected Timeout createTestTimeout() { + demandCreateConfiguration(); + return new Timeout( + getTestTimeoutSeconds() * 1000); + } + + /** + * Get the test timeout in seconds. + * @return the test timeout as set in system properties or the default. + */ + protected static int getTestTimeoutSeconds() { + return getTestPropertyInt(null, + KEY_TEST_TIMEOUT, + DEFAULT_TEST_TIMEOUT); + } + + /** + * Describe a test in the logs. * @param text text to print * @param args arguments to format in the printing */ @@ -189,4 +182,30 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants { } } + /** + * Get the gauge value of a statistic. Raises an assertion if + * there is no such gauge. + * @param statistic statistic to look up + * @return the value. + */ + public long gaugeValue(Statistic statistic) { + S3AInstrumentation instrumentation = fs.getInstrumentation(); + MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol()); + assertNotNull("No gauge " + statistic + + " in " + instrumentation.dump("", " = ", "\n", true), gauge); + return gauge.value(); + } + + protected boolean isEnabled() { + return enabled; + } + + /** + * Flag to indicate that this test is being used sequentially. This + * is used by some of the scale tests to validate test time expectations. + * @return true if the build indicates this test is being run in parallel. + */ + protected boolean isParallelExecution() { + return Boolean.getBoolean(S3ATestConstants.KEY_PARALLEL_TEST_EXECUTION); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org