http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java new file mode 100644 index 0000000..2c348f5 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java @@ -0,0 +1,696 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.google.common.collect.Sets; +import org.hamcrest.core.StringStartsWith; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.AWSClientIOException; +import org.apache.hadoop.fs.s3a.MockS3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.files.PendingSet; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; +import static org.apache.hadoop.test.LambdaTestUtils.*; + +/** + * The main unit test suite of the staging committer. + * Parameterized on thread count and unique filename policy. + */ +@RunWith(Parameterized.class) +public class TestStagingCommitter extends StagingTestBase.MiniDFSTest { + + private static final JobID JOB_ID = new JobID("job", 1); + private static final TaskAttemptID AID = new TaskAttemptID( + new TaskID(JOB_ID, TaskType.REDUCE, 2), 3); + private static final Logger LOG = + LoggerFactory.getLogger(TestStagingCommitter.class); + + private final int numThreads; + private final boolean uniqueFilenames; + private JobContext job = null; + private TaskAttemptContext tac = null; + private Configuration conf = null; + private MockedStagingCommitter jobCommitter = null; + private MockedStagingCommitter committer = null; + + // created in Before + private S3AFileSystem mockFS = null; + private MockS3AFileSystem wrapperFS = null; + + // created in Before + private StagingTestBase.ClientResults results = null; + private StagingTestBase.ClientErrors errors = null; + private AmazonS3 mockClient = null; + private File tmpDir; + + /** + * Describe a test in the logs. + * @param text text to print + * @param args arguments to format in the printing + */ + protected void describe(String text, Object... args) { + LOG.info("\n\n: {}\n", String.format(text, args)); + } + + /** + * Test array for parameterized test runs: how many threads and + * how many files to use. + * @return a list of parameter tuples. + */ + @Parameterized.Parameters + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][] { + {0, false}, + {1, true}, + {3, true}, + }); + } + + public TestStagingCommitter(int numThreads, boolean uniqueFilenames) { + this.numThreads = numThreads; + this.uniqueFilenames = uniqueFilenames; + } + + @Before + public void setupCommitter() throws Exception { + JobConf jobConf = getConfiguration(); + jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads); + jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, + uniqueFilenames); + jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, + UUID.randomUUID().toString()); + + this.results = new StagingTestBase.ClientResults(); + this.errors = new StagingTestBase.ClientErrors(); + this.mockClient = newMockS3Client(results, errors); + this.mockFS = createAndBindMockFSInstance(jobConf, + Pair.of(results, errors)); + this.wrapperFS = lookupWrapperFS(jobConf); + // and bind the FS + wrapperFS.setAmazonS3Client(mockClient); + + + this.job = new JobContextImpl(jobConf, JOB_ID); + this.tac = new TaskAttemptContextImpl( + new Configuration(job.getConfiguration()), AID); + + this.jobCommitter = new MockedStagingCommitter(OUTPUT_PATH, tac); + jobCommitter.setupJob(job); + + // get the task's configuration copy so modifications take effect + this.conf = tac.getConfiguration(); + this.conf.setInt(MULTIPART_SIZE, 100); + + tmpDir = File.createTempFile("testStagingCommitter", ""); + tmpDir.delete(); + tmpDir.mkdirs(); + + String tmp = tmpDir.getCanonicalPath(); + this.conf.set(BUFFER_DIR, + String.format("%s/local-0/, %s/local-1 ", tmp, tmp)); + + this.committer = new MockedStagingCommitter(OUTPUT_PATH, tac); + Paths.resetTempFolderCache(); + } + + @After + public void cleanup() { + try { + if (tmpDir != null) { + FileUtils.deleteDirectory(tmpDir); + } + } catch (IOException ignored) { + + } + } + + @Test + public void testUUIDPropagation() throws Exception { + Configuration config = new Configuration(); + String jobUUID = addUUID(config); + assertEquals("Upload UUID", jobUUID, + StagingCommitter.getUploadUUID(config, JOB_ID)); + } + + private String addUUID(Configuration config) { + String jobUUID = UUID.randomUUID().toString(); + config.set(FS_S3A_COMMITTER_STAGING_UUID, jobUUID); + return jobUUID; + } + + @Test + public void testAttemptPathConstructionNoSchema() throws Exception { + Configuration config = new Configuration(); + final String jobUUID = addUUID(config); + config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1"); + String commonPath = "file:/tmp/mr-local-"; + + assertThat("Missing scheme should produce local file paths", + getLocalTaskAttemptTempDir(config, + jobUUID, tac.getTaskAttemptID()).toString(), + StringStartsWith.startsWith(commonPath)); + } + + @Test + public void testAttemptPathConstructionWithSchema() throws Exception { + Configuration config = new Configuration(); + final String jobUUID = addUUID(config); + String commonPath = "file:/tmp/mr-local-"; + + config.set(BUFFER_DIR, + "file:/tmp/mr-local-0,file:/tmp/mr-local-1"); + + assertThat("Path should be the same with file scheme", + getLocalTaskAttemptTempDir(config, + jobUUID, tac.getTaskAttemptID()).toString(), + StringStartsWith.startsWith(commonPath)); + } + + @Test + public void testAttemptPathConstructionWrongSchema() throws Exception { + Configuration config = new Configuration(); + final String jobUUID = addUUID(config); + config.set(BUFFER_DIR, + "hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1"); + intercept(IllegalArgumentException.class, "Wrong FS", + () -> getLocalTaskAttemptTempDir(config, jobUUID, + tac.getTaskAttemptID())); + } + + @Test + public void testCommitPathConstruction() throws Exception { + Path committedTaskPath = committer.getCommittedTaskPath(tac); + assertEquals("Path should be in HDFS: " + committedTaskPath, + "hdfs", committedTaskPath.toUri().getScheme()); + String ending = STAGING_UPLOADS + "/_temporary/0/task_job_0001_r_000002"; + assertTrue("Did not end with \"" + ending +"\" :" + committedTaskPath, + committedTaskPath.toString().endsWith(ending)); + } + + @Test + public void testSingleTaskCommit() throws Exception { + Path file = new Path(commitTask(committer, tac, 1).iterator().next()); + + List<String> uploads = results.getUploads(); + assertEquals("Should initiate one upload: " + results, 1, uploads.size()); + + Path committedPath = committer.getCommittedTaskPath(tac); + FileSystem dfs = committedPath.getFileSystem(conf); + + assertEquals("Should commit to HDFS: "+ committer, getDFS(), dfs); + + FileStatus[] stats = dfs.listStatus(committedPath); + assertEquals("Should produce one commit file: " + results, 1, stats.length); + assertEquals("Should name the commits file with the task ID: " + results, + "task_job_0001_r_000002", stats[0].getPath().getName()); + + PendingSet pending = PendingSet.load(dfs, stats[0].getPath()); + assertEquals("Should have one pending commit", 1, pending.size()); + SinglePendingCommit commit = pending.getCommits().get(0); + assertEquals("Should write to the correct bucket:" + results, + BUCKET, commit.getBucket()); + assertEquals("Should write to the correct key: " + results, + OUTPUT_PREFIX + "/" + file.getName(), commit.getDestinationKey()); + + assertValidUpload(results.getTagsByUpload(), commit); + } + + /** + * This originally verified that empty files weren't PUT. They are now. + * @throws Exception on a failure + */ + @Test + public void testSingleTaskEmptyFileCommit() throws Exception { + committer.setupTask(tac); + + Path attemptPath = committer.getTaskAttemptPath(tac); + + String rand = UUID.randomUUID().toString(); + writeOutputFile(tac.getTaskAttemptID(), attemptPath, rand, 0); + + committer.commitTask(tac); + + List<String> uploads = results.getUploads(); + assertEquals("Should initiate one upload", 1, uploads.size()); + + Path committedPath = committer.getCommittedTaskPath(tac); + FileSystem dfs = committedPath.getFileSystem(conf); + + assertEquals("Should commit to HDFS", getDFS(), dfs); + + assertIsFile(dfs, committedPath); + FileStatus[] stats = dfs.listStatus(committedPath); + assertEquals("Should produce one commit file", 1, stats.length); + assertEquals("Should name the commits file with the task ID", + "task_job_0001_r_000002", stats[0].getPath().getName()); + + PendingSet pending = PendingSet.load(dfs, + stats[0].getPath()); + assertEquals("Should have one pending commit", 1, pending.size()); + } + + @Test + public void testSingleTaskMultiFileCommit() throws Exception { + int numFiles = 3; + Set<String> files = commitTask(committer, tac, numFiles); + + List<String> uploads = results.getUploads(); + assertEquals("Should initiate multiple uploads", numFiles, uploads.size()); + + Path committedPath = committer.getCommittedTaskPath(tac); + FileSystem dfs = committedPath.getFileSystem(conf); + + assertEquals("Should commit to HDFS", getDFS(), dfs); + assertIsFile(dfs, committedPath); + FileStatus[] stats = dfs.listStatus(committedPath); + assertEquals("Should produce one commit file", 1, stats.length); + assertEquals("Should name the commits file with the task ID", + "task_job_0001_r_000002", stats[0].getPath().getName()); + + List<SinglePendingCommit> pending = + PendingSet.load(dfs, stats[0].getPath()).getCommits(); + assertEquals("Should have correct number of pending commits", + files.size(), pending.size()); + + Set<String> keys = Sets.newHashSet(); + for (SinglePendingCommit commit : pending) { + assertEquals("Should write to the correct bucket: " + commit, + BUCKET, commit.getBucket()); + assertValidUpload(results.getTagsByUpload(), commit); + keys.add(commit.getDestinationKey()); + } + + assertEquals("Should write to the correct key", + files, keys); + } + + @Test + public void testTaskInitializeFailure() throws Exception { + committer.setupTask(tac); + + errors.failOnInit(1); + + Path attemptPath = committer.getTaskAttemptPath(tac); + FileSystem fs = attemptPath.getFileSystem(conf); + + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + + intercept(AWSClientIOException.class, + "Fail on init 1", + "Should fail during init", + () -> committer.commitTask(tac)); + + assertEquals("Should have initialized one file upload", + 1, results.getUploads().size()); + assertEquals("Should abort the upload", + new HashSet<>(results.getUploads()), + getAbortedIds(results.getAborts())); + assertPathDoesNotExist(fs, + "Should remove the attempt path", + attemptPath); + } + + @Test + public void testTaskSingleFileUploadFailure() throws Exception { + describe("Set up a single file upload to fail on upload 2"); + committer.setupTask(tac); + + errors.failOnUpload(2); + + Path attemptPath = committer.getTaskAttemptPath(tac); + FileSystem fs = attemptPath.getFileSystem(conf); + + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + + intercept((Class<? extends Exception>) AWSClientIOException.class, + "Fail on upload 2", + "Should fail during upload", + () -> { + committer.commitTask(tac); + return committer.toString(); + }); + + assertEquals("Should have attempted one file upload", + 1, results.getUploads().size()); + assertEquals("Should abort the upload", + results.getUploads().get(0), + results.getAborts().get(0).getUploadId()); + assertPathDoesNotExist(fs, "Should remove the attempt path", + attemptPath); + } + + @Test + public void testTaskMultiFileUploadFailure() throws Exception { + committer.setupTask(tac); + + errors.failOnUpload(5); + + Path attemptPath = committer.getTaskAttemptPath(tac); + FileSystem fs = attemptPath.getFileSystem(conf); + + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + + intercept((Class<? extends Exception>) AWSClientIOException.class, + "Fail on upload 5", + "Should fail during upload", + () -> { + committer.commitTask(tac); + return committer.toString(); + }); + + assertEquals("Should have attempted two file uploads", + 2, results.getUploads().size()); + assertEquals("Should abort the upload", + new HashSet<>(results.getUploads()), + getAbortedIds(results.getAborts())); + assertPathDoesNotExist(fs, "Should remove the attempt path", + attemptPath); + } + + @Test + public void testTaskUploadAndAbortFailure() throws Exception { + committer.setupTask(tac); + + errors.failOnUpload(5); + errors.failOnAbort(0); + + Path attemptPath = committer.getTaskAttemptPath(tac); + FileSystem fs = attemptPath.getFileSystem(conf); + + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + writeOutputFile(tac.getTaskAttemptID(), attemptPath, + UUID.randomUUID().toString(), 10); + + intercept((Class<? extends Exception>) AWSClientIOException.class, + "Fail on upload 5", + "Should suppress abort failure, propagate upload failure", + ()-> { + committer.commitTask(tac); + return committer.toString(); + }); + + assertEquals("Should have attempted two file uploads", + 2, results.getUploads().size()); + assertEquals("Should not have succeeded with any aborts", + new HashSet<>(), + getAbortedIds(results.getAborts())); + assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath); + } + + @Test + public void testSingleTaskAbort() throws Exception { + committer.setupTask(tac); + + Path attemptPath = committer.getTaskAttemptPath(tac); + FileSystem fs = attemptPath.getFileSystem(conf); + + Path outPath = writeOutputFile( + tac.getTaskAttemptID(), attemptPath, UUID.randomUUID().toString(), 10); + + committer.abortTask(tac); + + assertEquals("Should not upload anything", + 0, results.getUploads().size()); + assertEquals("Should not upload anything", + 0, results.getParts().size()); + assertPathDoesNotExist(fs, "Should remove all attempt data", outPath); + assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath); + + } + + @Test + public void testJobCommit() throws Exception { + Path jobAttemptPath = jobCommitter.getJobAttemptPath(job); + FileSystem fs = jobAttemptPath.getFileSystem(conf); + + Set<String> uploads = runTasks(job, 4, 3); + assertNotEquals(0, uploads.size()); + + assertPathExists(fs, "No job attempt path", jobAttemptPath); + + jobCommitter.commitJob(job); + assertEquals("Should have aborted no uploads", + 0, results.getAborts().size()); + + assertEquals("Should have deleted no uploads", + 0, results.getDeletes().size()); + + assertEquals("Should have committed all uploads", + uploads, getCommittedIds(results.getCommits())); + + assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath); + + } + + @Test + public void testJobCommitFailure() throws Exception { + Path jobAttemptPath = jobCommitter.getJobAttemptPath(job); + FileSystem fs = jobAttemptPath.getFileSystem(conf); + + Set<String> uploads = runTasks(job, 4, 3); + + assertPathExists(fs, "No job attempt path", jobAttemptPath); + + errors.failOnCommit(5); + setMockLogLevel(MockS3AFileSystem.LOG_NAME); + + intercept(AWSClientIOException.class, + "Fail on commit 5", + "Should propagate the commit failure", + () -> { + jobCommitter.commitJob(job); + return jobCommitter.toString(); + }); + + assertEquals("Should have succeeded to commit some uploads", + 5, results.getCommits().size()); + + assertEquals("Should have deleted the files that succeeded", + 5, results.getDeletes().size()); + + Set<String> commits = results.getCommits() + .stream() + .map((commit) -> commit.getBucketName() + commit.getKey()) + .collect(Collectors.toSet()); + + Set<String> deletes = results.getDeletes() + .stream() + .map((delete) -> delete.getBucketName() + delete.getKey()) + .collect(Collectors.toSet()); + + assertEquals("Committed and deleted objects should match", + commits, deletes); + + assertEquals("Mismatch in aborted upload count", + 7, results.getAborts().size()); + + Set<String> uploadIds = getCommittedIds(results.getCommits()); + uploadIds.addAll(getAbortedIds(results.getAborts())); + + assertEquals("Should have committed/deleted or aborted all uploads", + uploads, uploadIds); + + assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath); + } + + @Test + public void testJobAbort() throws Exception { + Path jobAttemptPath = jobCommitter.getJobAttemptPath(job); + FileSystem fs = jobAttemptPath.getFileSystem(conf); + + Set<String> uploads = runTasks(job, 4, 3); + + assertPathExists(fs, "No job attempt path", jobAttemptPath); + jobCommitter.abortJob(job, JobStatus.State.KILLED); + assertEquals("Should have committed no uploads: " + jobCommitter, + 0, results.getCommits().size()); + + assertEquals("Should have deleted no uploads: " + jobCommitter, + 0, results.getDeletes().size()); + + assertEquals("Should have aborted all uploads: " + jobCommitter, + uploads, getAbortedIds(results.getAborts())); + + assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath); + } + + /** + * Run tasks, return the uploaded dataset. The upload data is + * extracted from the {@link #results} field; this is reset + * before the operation. + * @param jobContext job ctx + * @param numTasks number of tasks to run + * @param numFiles number of files for each task to generate + * @return a set of all uploads + * @throws IOException on a failure. + */ + private Set<String> runTasks(JobContext jobContext, + int numTasks, int numFiles) + throws IOException { + results.resetUploads(); + Set<String> uploads = Sets.newHashSet(); + + for (int taskId = 0; taskId < numTasks; taskId += 1) { + TaskAttemptID attemptID = new TaskAttemptID( + new TaskID(JOB_ID, TaskType.REDUCE, taskId), + (taskId * 37) % numTasks); + TaskAttemptContext attempt = new TaskAttemptContextImpl( + new Configuration(jobContext.getConfiguration()), attemptID); + MockedStagingCommitter taskCommitter = new MockedStagingCommitter( + OUTPUT_PATH, attempt); + commitTask(taskCommitter, attempt, numFiles); + } + + uploads.addAll(results.getUploads()); + return uploads; + } + + private static Set<String> getAbortedIds( + List<AbortMultipartUploadRequest> aborts) { + return aborts.stream() + .map(AbortMultipartUploadRequest::getUploadId) + .collect(Collectors.toSet()); + } + + private static Set<String> getCommittedIds( + List<CompleteMultipartUploadRequest> commits) { + return commits.stream() + .map(CompleteMultipartUploadRequest::getUploadId) + .collect(Collectors.toSet()); + } + + private Set<String> commitTask(StagingCommitter staging, + TaskAttemptContext attempt, + int numFiles) + throws IOException { + Path attemptPath = staging.getTaskAttemptPath(attempt); + + Set<String> files = Sets.newHashSet(); + for (int i = 0; i < numFiles; i += 1) { + Path outPath = writeOutputFile( + attempt.getTaskAttemptID(), attemptPath, UUID.randomUUID().toString(), + 10 * (i + 1)); + files.add(OUTPUT_PREFIX + + "/" + outPath.getName() + + (uniqueFilenames ? ("-" + staging.getUUID()) : "")); + } + + staging.commitTask(attempt); + + return files; + } + + private static void assertValidUpload(Map<String, List<String>> parts, + SinglePendingCommit commit) { + assertTrue("Should commit a valid uploadId", + parts.containsKey(commit.getUploadId())); + + List<String> tags = parts.get(commit.getUploadId()); + assertEquals("Should commit the correct number of file parts", + tags.size(), commit.getPartCount()); + + for (int i = 0; i < tags.size(); i += 1) { + assertEquals("Should commit the correct part tags", + tags.get(i), commit.getEtags().get(i)); + } + } + + private static Path writeOutputFile(TaskAttemptID id, Path dest, + String content, long copies) + throws IOException { + String fileName = ((id.getTaskType() == TaskType.REDUCE) ? "r_" : "m_") + + id.getTaskID().getId() + "_" + id.getId() + "_" + + UUID.randomUUID().toString(); + Path outPath = new Path(dest, fileName); + FileSystem fs = outPath.getFileSystem(getConfiguration()); + + try (OutputStream out = fs.create(outPath)) { + byte[] bytes = content.getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < copies; i += 1) { + out.write(bytes); + } + } + + return outPath; + } + + /** + * Used during debugging mock test failures; cranks up logging of method + * calls. + * @param level log level + */ + private void setMockLogLevel(int level) { + wrapperFS.setLogEvents(level); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java new file mode 100644 index 0000000..4f0189e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.Mockito.*; + +/** Mocking test of directory committer. */ +public class TestStagingDirectoryOutputCommitter + extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> { + + @Override + DirectoryStagingCommitter newJobCommitter() throws Exception { + return new DirectoryStagingCommitter(OUTPUT_PATH, + createTaskAttemptForJob()); + } + + @Test + public void testBadConflictMode() throws Throwable { + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge"); + intercept(IllegalArgumentException.class, + "MERGE", "committer conflict", + this::newJobCommitter); + } + + @Test + public void testDefaultConflictResolution() throws Exception { + getJob().getConfiguration().unset( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); + verifyFailureConflictOutcome(); + } + @Test + public void testFailConflictResolution() throws Exception { + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_FAIL); + verifyFailureConflictOutcome(); + } + + protected void verifyFailureConflictOutcome() throws Exception { + FileSystem mockS3 = getMockS3A(); + pathExists(mockS3, OUTPUT_PATH); + final DirectoryStagingCommitter committer = newJobCommitter(); + + intercept(PathExistsException.class, + InternalCommitterConstants.E_DEST_EXISTS, + "Should throw an exception because the path exists", + () -> committer.setupJob(getJob())); + + intercept(PathExistsException.class, + InternalCommitterConstants.E_DEST_EXISTS, + "Should throw an exception because the path exists", + () -> committer.commitJob(getJob())); + + reset(mockS3); + pathDoesNotExist(mockS3, OUTPUT_PATH); + + committer.setupJob(getJob()); + verifyExistenceChecked(mockS3, OUTPUT_PATH); + verifyNoMoreInteractions(mockS3); + + reset(mockS3); + pathDoesNotExist(mockS3, OUTPUT_PATH); + committer.commitJob(getJob()); + verifyExistenceChecked(mockS3, OUTPUT_PATH); + verifyCompletion(mockS3); + } + + @Test + public void testAppendConflictResolution() throws Exception { + FileSystem mockS3 = getMockS3A(); + + pathExists(mockS3, OUTPUT_PATH); + + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + + final DirectoryStagingCommitter committer = newJobCommitter(); + + committer.setupJob(getJob()); + verifyNoMoreInteractions(mockS3); + + Mockito.reset(mockS3); + pathExists(mockS3, OUTPUT_PATH); + + committer.commitJob(getJob()); + verifyCompletion(mockS3); + } + + @Test + public void testReplaceConflictResolution() throws Exception { + FileSystem mockS3 = getMockS3A(); + + pathExists(mockS3, OUTPUT_PATH); + + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE); + + final DirectoryStagingCommitter committer = newJobCommitter(); + + committer.setupJob(getJob()); + verifyNoMoreInteractions(mockS3); + + Mockito.reset(mockS3); + pathExists(mockS3, OUTPUT_PATH); + canDelete(mockS3, OUTPUT_PATH); + + committer.commitJob(getJob()); + verifyDeleted(mockS3, OUTPUT_PATH); + verifyCompletion(mockS3); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java new file mode 100644 index 0000000..139b4e3 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import org.junit.After; +import org.junit.Test; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.hasItem; + +/** + * Test partitioned staging committer's logic for putting data in the right + * place. + */ +public class TestStagingPartitionedFileListing + extends TaskCommitterTest<PartitionedStagingCommitter> { + + @Override + PartitionedStagingCommitter newJobCommitter() throws IOException { + return new PartitionedStagingCommitter(OUTPUT_PATH, + createTaskAttemptForJob()); + } + + @Override + PartitionedStagingCommitter newTaskCommitter() throws IOException { + return new PartitionedStagingCommitter(OUTPUT_PATH, getTAC()); + } + + private FileSystem attemptFS; + private Path attemptPath; + + @After + public void cleanupAttempt() { + cleanup("teardown", attemptFS, attemptPath); + } + + @Test + public void testTaskOutputListing() throws Exception { + PartitionedStagingCommitter committer = newTaskCommitter(); + + // create files in the attempt path that should be found by getTaskOutput + attemptPath = committer.getTaskAttemptPath(getTAC()); + attemptFS = attemptPath.getFileSystem( + getTAC().getConfiguration()); + attemptFS.delete(attemptPath, true); + + try { + List<String> expectedFiles = Lists.newArrayList(); + for (String dateint : Arrays.asList("20161115", "20161116")) { + for (String hour : Arrays.asList("13", "14")) { + String relative = "dateint=" + dateint + "/hour=" + hour + + "/" + UUID.randomUUID().toString() + ".parquet"; + expectedFiles.add(relative); + attemptFS.create(new Path(attemptPath, relative)).close(); + } + } + + List<String> actualFiles = committer.getTaskOutput(getTAC()) + .stream() + .map(stat -> Paths.getRelativePath(attemptPath, + stat.getPath())) + .collect(Collectors.toList()); + Collections.sort(expectedFiles); + Collections.sort(actualFiles); + assertEquals("File sets should match", expectedFiles, actualFiles); + } finally { + deleteQuietly(attemptFS, attemptPath, true); + } + + } + + @Test + public void testTaskOutputListingWithHiddenFiles() throws Exception { + PartitionedStagingCommitter committer = newTaskCommitter(); + + // create files in the attempt path that should be found by getTaskOutput + attemptPath = committer.getTaskAttemptPath(getTAC()); + attemptFS = attemptPath.getFileSystem( + getTAC().getConfiguration()); + attemptFS.delete(attemptPath, true); + + try { + List<String> expectedFiles = Lists.newArrayList(); + for (String dateint : Arrays.asList("20161115", "20161116")) { + String metadata = "dateint=" + dateint + "/" + "_metadata"; + attemptFS.create(new Path(attemptPath, metadata)).close(); + + for (String hour : Arrays.asList("13", "14")) { + String relative = "dateint=" + dateint + "/hour=" + hour + + "/" + UUID.randomUUID().toString() + ".parquet"; + expectedFiles.add(relative); + attemptFS.create(new Path(attemptPath, relative)).close(); + + String partial = "dateint=" + dateint + "/hour=" + hour + + "/." + UUID.randomUUID().toString() + ".partial"; + attemptFS.create(new Path(attemptPath, partial)).close(); + } + } + + List<String> actualFiles = committer.getTaskOutput(getTAC()).stream() + .map(stat -> Paths.getRelativePath(attemptPath, stat.getPath())) + .collect(Collectors.toList()); + Collections.sort(expectedFiles); + Collections.sort(actualFiles); + assertEquals("File sets should match", expectedFiles, actualFiles); + } finally { + deleteQuietly(attemptFS, attemptPath, true); + } + + } + + @Test + public void testPartitionsResolution() throws Throwable { + + File tempDir = getTempDir(); + File partitionsDir = new File(tempDir, "partitions"); + + attemptPath = new Path(partitionsDir.toURI()); + attemptFS = FileSystem.getLocal(getJob().getConfiguration()); + deleteQuietly(attemptFS, attemptPath, true); + attemptFS.mkdirs(attemptPath); + // initial partitioning -> empty + assertTrue(Paths.getPartitions(attemptPath, new ArrayList<>(0)).isEmpty()); + String oct2017 = "year=2017/month=10"; + Path octLog = new Path(attemptPath, oct2017 + "/log-2017-10-04.txt"); + touch(attemptFS, octLog); + assertThat(listPartitions(attemptFS, attemptPath), hasItem(oct2017)); + + // add a root entry and it ends up under the table_root entry + Path rootFile = new Path(attemptPath, "root.txt"); + touch(attemptFS, rootFile); + assertThat(listPartitions(attemptFS, attemptPath), + allOf(hasItem(oct2017), + hasItem(StagingCommitterConstants.TABLE_ROOT))); + } + + /** + * List files in a filesystem using {@code listFiles()}, + * then get all the partitions. + * @param fs filesystem + * @param base base of tree + * @return a list of partitions + * @throws IOException failure + */ + private Set<String> listPartitions(FileSystem fs, Path base) + throws IOException { + List<FileStatus> statusList = mapLocatedFiles( + fs.listFiles(base, true), s -> (FileStatus) s); + return Paths.getPartitions(base, statusList); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java new file mode 100644 index 0000000..4df3912 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.MockS3AFileSystem; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.commit.PathCommitException; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.Mockito.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** Mocking test of partitioned committer. */ +public class TestStagingPartitionedJobCommit + extends StagingTestBase.JobCommitterTest<PartitionedStagingCommitter> { + + @Override + public void setupJob() throws Exception { + super.setupJob(); + getWrapperFS().setLogEvents(MockS3AFileSystem.LOG_NAME); + } + + @Override + PartitionedStagingCommitter newJobCommitter() throws IOException { + return new PartitionedStagingCommitterForTesting(createTaskAttemptForJob()); + } + + /** + * Subclass of the Partitioned Staging committer used in the test cases. + */ + private static final class PartitionedStagingCommitterForTesting + extends PartitionedCommitterForTesting { + + private boolean aborted = false; + + private PartitionedStagingCommitterForTesting(TaskAttemptContext context) + throws IOException { + super(OUTPUT_PATH, context); + } + + @Override + protected List<SinglePendingCommit> listPendingUploadsToCommit( + JobContext context) throws IOException { + List<SinglePendingCommit> pending = Lists.newArrayList(); + + for (String dateint : Arrays.asList("20161115", "20161116")) { + for (String hour : Arrays.asList("13", "14")) { + String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour + + "/" + UUID.randomUUID().toString() + ".parquet"; + SinglePendingCommit commit = new SinglePendingCommit(); + commit.setBucket(BUCKET); + commit.setDestinationKey(key); + commit.setUri("s3a://" + BUCKET + "/" + key); + commit.setUploadId(UUID.randomUUID().toString()); + commit.setEtags(new ArrayList<>()); + pending.add(commit); + } + } + return pending; + } + + @Override + protected void abortJobInternal(JobContext context, + boolean suppressExceptions) throws IOException { + this.aborted = true; + super.abortJobInternal(context, suppressExceptions); + } + } + + @Test + public void testDefaultFailAndAppend() throws Exception { + FileSystem mockS3 = getMockS3A(); + + // both fail and append don't check. fail is enforced at the task level. + for (String mode : Arrays.asList(null, CONFLICT_MODE_FAIL, + CONFLICT_MODE_APPEND)) { + if (mode != null) { + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, mode); + } else { + getJob().getConfiguration().unset( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); + } + + PartitionedStagingCommitter committer = newJobCommitter(); + + // no directories exist + committer.commitJob(getJob()); + + // parent and peer directories exist + reset(mockS3); + pathsExist(mockS3, "dateint=20161116", + "dateint=20161116/hour=10"); + committer.commitJob(getJob()); + verifyCompletion(mockS3); + + // a leaf directory exists. + // NOTE: this is not checked during job commit, the commit succeeds. + reset(mockS3); + pathsExist(mockS3, "dateint=20161115/hour=14"); + committer.commitJob(getJob()); + verifyCompletion(mockS3); + } + } + + @Test + public void testBadConflictMode() throws Throwable { + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge"); + intercept(IllegalArgumentException.class, + "MERGE", "committer conflict", + this::newJobCommitter); + } + + @Test + public void testReplace() throws Exception { + S3AFileSystem mockS3 = getMockS3A(); + + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE); + + PartitionedStagingCommitter committer = newJobCommitter(); + + committer.commitJob(getJob()); + verifyReplaceCommitActions(mockS3); + verifyCompletion(mockS3); + + // parent and peer directories exist + reset(mockS3); + pathsExist(mockS3, "dateint=20161115", + "dateint=20161115/hour=12"); + + committer.commitJob(getJob()); + verifyReplaceCommitActions(mockS3); + verifyCompletion(mockS3); + + // partition directories exist and should be removed + reset(mockS3); + pathsExist(mockS3, "dateint=20161115/hour=12", + "dateint=20161115/hour=13"); + canDelete(mockS3, "dateint=20161115/hour=13"); + + committer.commitJob(getJob()); + verifyDeleted(mockS3, "dateint=20161115/hour=13"); + verifyReplaceCommitActions(mockS3); + verifyCompletion(mockS3); + + // partition directories exist and should be removed + reset(mockS3); + pathsExist(mockS3, "dateint=20161116/hour=13", + "dateint=20161116/hour=14"); + + canDelete(mockS3, "dateint=20161116/hour=13", + "dateint=20161116/hour=14"); + + committer.commitJob(getJob()); + verifyReplaceCommitActions(mockS3); + verifyDeleted(mockS3, "dateint=20161116/hour=13"); + verifyDeleted(mockS3, "dateint=20161116/hour=14"); + verifyCompletion(mockS3); + } + + + /** + * Verify the actions which replace does, essentially: delete the parent + * partitions. + * @param mockS3 s3 mock + */ + protected void verifyReplaceCommitActions(FileSystem mockS3) + throws IOException { + verifyDeleted(mockS3, "dateint=20161115/hour=13"); + verifyDeleted(mockS3, "dateint=20161115/hour=14"); + verifyDeleted(mockS3, "dateint=20161116/hour=13"); + verifyDeleted(mockS3, "dateint=20161116/hour=14"); + } + + @Test + public void testReplaceWithDeleteFailure() throws Exception { + FileSystem mockS3 = getMockS3A(); + + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE); + + final PartitionedStagingCommitter committer = newJobCommitter(); + + pathsExist(mockS3, "dateint=20161116/hour=14"); + when(mockS3 + .delete( + new Path(OUTPUT_PATH, "dateint=20161116/hour=14"), + true)) + .thenThrow(new PathCommitException("fake", + "Fake IOException for delete")); + + intercept(PathCommitException.class, "Fake IOException for delete", + "Should throw the fake IOException", + () -> committer.commitJob(getJob())); + + verifyReplaceCommitActions(mockS3); + verifyDeleted(mockS3, "dateint=20161116/hour=14"); + assertTrue("Should have aborted", + ((PartitionedStagingCommitterForTesting) committer).aborted); + verifyCompletion(mockS3); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java new file mode 100644 index 0000000..ddcb56e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathExistsException; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.mapreduce.JobContext; + +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.mockito.Mockito.*; +import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*; + +/** Mocking test of the partitioned committer. */ +public class TestStagingPartitionedTaskCommit + extends StagingTestBase.TaskCommitterTest<PartitionedStagingCommitter> { + + @Override + PartitionedStagingCommitter newJobCommitter() throws IOException { + return new PartitionedStagingCommitter(OUTPUT_PATH, + createTaskAttemptForJob()); + } + + @Override + PartitionedStagingCommitter newTaskCommitter() throws Exception { + return new PartitionedStagingCommitter(OUTPUT_PATH, getTAC()); + } + + // The set of files used by this test + private static List<String> relativeFiles = Lists.newArrayList(); + + @BeforeClass + public static void createRelativeFileList() { + for (String dateint : Arrays.asList("20161115", "20161116")) { + for (String hour : Arrays.asList("14", "15")) { + String relative = "dateint=" + dateint + "/hour=" + hour + + "/" + UUID.randomUUID().toString() + ".parquet"; + relativeFiles.add(relative); + } + } + } + + @Test + public void testBadConflictMode() throws Throwable { + getJob().getConfiguration().set( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge"); + intercept(IllegalArgumentException.class, + "MERGE", "committer conflict", this::newJobCommitter); + } + + @Test + public void testDefault() throws Exception { + FileSystem mockS3 = getMockS3A(); + + JobContext job = getJob(); + job.getConfiguration().unset( + FS_S3A_COMMITTER_STAGING_CONFLICT_MODE); + final PartitionedStagingCommitter committer = newTaskCommitter(); + + committer.setupTask(getTAC()); + assertConflictResolution(committer, job, ConflictResolution.FAIL); + createTestOutputFiles(relativeFiles, + committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration()); + + // test failure when one partition already exists + reset(mockS3); + pathExists(mockS3, new Path(OUTPUT_PATH, relativeFiles.get(0)).getParent()); + + intercept(PathExistsException.class, + InternalCommitterConstants.E_DEST_EXISTS, + "Expected a PathExistsException as a partition already exists", + () -> committer.commitTask(getTAC())); + + // test success + reset(mockS3); + + committer.commitTask(getTAC()); + Set<String> files = Sets.newHashSet(); + for (InitiateMultipartUploadRequest request : + getMockResults().getRequests().values()) { + assertEquals(BUCKET, request.getBucketName()); + files.add(request.getKey()); + } + assertEquals("Should have the right number of uploads", + relativeFiles.size(), files.size()); + + Set<String> expected = buildExpectedList(committer); + + assertEquals("Should have correct paths", expected, files); + } + + @Test + public void testFail() throws Exception { + FileSystem mockS3 = getMockS3A(); + + getTAC().getConfiguration() + .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_FAIL); + + final PartitionedStagingCommitter committer = newTaskCommitter(); + + committer.setupTask(getTAC()); + createTestOutputFiles(relativeFiles, + committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration()); + + // test failure when one partition already exists + reset(mockS3); + pathExists(mockS3, new Path(OUTPUT_PATH, relativeFiles.get(1)).getParent()); + + intercept(PathExistsException.class, "", + "Should complain because a partition already exists", + () -> committer.commitTask(getTAC())); + + // test success + reset(mockS3); + + committer.commitTask(getTAC()); + Set<String> files = Sets.newHashSet(); + for (InitiateMultipartUploadRequest request : + getMockResults().getRequests().values()) { + assertEquals(BUCKET, request.getBucketName()); + files.add(request.getKey()); + } + assertEquals("Should have the right number of uploads", + relativeFiles.size(), files.size()); + + Set<String> expected = buildExpectedList(committer); + + assertEquals("Should have correct paths", expected, files); + } + + @Test + public void testAppend() throws Exception { + FileSystem mockS3 = getMockS3A(); + + getTAC().getConfiguration() + .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND); + + PartitionedStagingCommitter committer = newTaskCommitter(); + + committer.setupTask(getTAC()); + createTestOutputFiles(relativeFiles, + committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration()); + + // test success when one partition already exists + reset(mockS3); + pathExists(mockS3, new Path(OUTPUT_PATH, relativeFiles.get(2)).getParent()); + + committer.commitTask(getTAC()); + Set<String> files = Sets.newHashSet(); + for (InitiateMultipartUploadRequest request : + getMockResults().getRequests().values()) { + assertEquals(BUCKET, request.getBucketName()); + files.add(request.getKey()); + } + assertEquals("Should have the right number of uploads", + relativeFiles.size(), files.size()); + + Set<String> expected = buildExpectedList(committer); + + assertEquals("Should have correct paths", expected, files); + } + + @Test + public void testReplace() throws Exception { + // TODO: this committer needs to delete the data that already exists + // This test should assert that the delete was done + FileSystem mockS3 = getMockS3A(); + + getTAC().getConfiguration() + .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE); + + PartitionedStagingCommitter committer = newTaskCommitter(); + + committer.setupTask(getTAC()); + createTestOutputFiles(relativeFiles, + committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration()); + + // test success when one partition already exists + reset(mockS3); + pathExists(mockS3, new Path(OUTPUT_PATH, relativeFiles.get(3)).getParent()); + + committer.commitTask(getTAC()); + Set<String> files = Sets.newHashSet(); + for (InitiateMultipartUploadRequest request : + getMockResults().getRequests().values()) { + assertEquals(BUCKET, request.getBucketName()); + files.add(request.getKey()); + } + assertEquals("Should have the right number of uploads", + relativeFiles.size(), files.size()); + + Set<String> expected = buildExpectedList(committer); + + assertEquals("Should have correct paths", expected, files); + } + + public Set<String> buildExpectedList(StagingCommitter committer) { + Set<String> expected = Sets.newHashSet(); + boolean unique = committer.useUniqueFilenames(); + for (String relative : relativeFiles) { + expected.add(OUTPUT_PREFIX + + "/" + + (unique ? Paths.addUUID(relative, committer.getUUID()) : relative)); + } + return expected; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java new file mode 100644 index 0000000..c10ebed --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; + +/** + * Full integration test for the directory committer. + */ +public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob { + + @Override + protected String committerName() { + return DirectoryStagingCommitter.NAME; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java new file mode 100644 index 0000000..1c19a95 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; + +/** + * Full integration test for the partition committer. + */ +public class ITPartitionCommitMRJob extends AbstractITCommitMRJob { + + @Override + protected String committerName() { + return PartitionedStagingCommitter.NAME; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java new file mode 100644 index 0000000..76ad464 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import org.junit.Test; + +import org.hamcrest.core.StringContains; +import org.hamcrest.core.StringEndsWith; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants; +import org.apache.hadoop.security.UserGroupInformation; + +import static org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory; + +/** + * Full integration test for the staging committer. + */ +public class ITStagingCommitMRJob extends AbstractITCommitMRJob { + + @Override + protected String committerName() { + return StagingCommitter.NAME; + } + + /** + * Verify that staging commit dirs are made absolute under the user's + * home directory, so, in a secure cluster, private. + */ + @Test + public void testStagingDirectory() throws Throwable { + FileSystem hdfs = getDFS(); + Configuration conf = hdfs.getConf(); + conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH, + "private"); + Path dir = getMultipartUploadCommitsDirectory(conf, "UUID"); + assertThat(dir.toString(), StringEndsWith.endsWith( + "UUID/" + + StagingCommitterConstants.STAGING_UPLOADS)); + assertTrue("path unqualified", dir.isAbsolute()); + String self = UserGroupInformation.getCurrentUser().getShortUserName(); + assertThat(dir.toString(), + StringContains.containsString("/user/" + self + "/private")); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java new file mode 100644 index 0000000..ea28ecb --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** ITest of the low level protocol methods. */ +public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol { + + @Override + protected String suitename() { + return "ITestDirectoryCommitProtocol"; + } + + @Override + protected String getCommitterName() { + return CommitConstants.COMMITTER_NAME_DIRECTORY; + } + + @Override + protected AbstractS3ACommitter createCommitter( + Path outputPath, + TaskAttemptContext context) + throws IOException { + return new DirectoryStagingCommitter(outputPath, context); + } + + @Override + public AbstractS3ACommitter createFailingCommitter( + TaskAttemptContext tContext) throws IOException { + return new CommitterWithFailedThenSucceed(getOutDir(), tContext); + } + + /** + * The class provides a overridden implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + private static final class CommitterWithFailedThenSucceed extends + DirectoryStagingCommitter implements CommitterFaultInjection { + + private final CommitterFaultInjectionImpl injection; + + CommitterWithFailedThenSucceed(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + injection = new CommitterFaultInjectionImpl(outputPath, context, true); + } + + @Override + public void setupJob(JobContext context) throws IOException { + injection.setupJob(context); + super.setupJob(context); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) + throws IOException { + injection.abortJob(context, state); + super.abortJob(context, state); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext context) throws IOException { + injection.cleanupJob(context); + super.cleanupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + injection.setupTask(context); + super.setupTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + injection.commitTask(context); + super.commitTask(context); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + injection.abortTask(context); + super.abortTask(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + injection.commitJob(context); + super.commitJob(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + injection.needsTaskCommit(context); + return super.needsTaskCommit(context); + } + + @Override + public void setFaults(CommitterFaultInjection.Faults... faults) { + injection.setFaults(faults); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java new file mode 100644 index 0000000..e3bc150 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; +import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter; +import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; + +/** ITest of the low level protocol methods. */ +public class ITestPartitionedCommitProtocol extends ITestStagingCommitProtocol { + + @Override + protected String suitename() { + return "ITestPartitionedCommitProtocol"; + } + + @Override + protected String getCommitterName() { + return CommitConstants.COMMITTER_NAME_PARTITIONED; + } + + @Override + protected AbstractS3ACommitter createCommitter( + Path outputPath, + TaskAttemptContext context) + throws IOException { + return new PartitionedStagingCommitter(outputPath, context); + } + + @Override + public AbstractS3ACommitter createFailingCommitter( + TaskAttemptContext tContext) throws IOException { + return new CommitterWithFailedThenSucceed(getOutDir(), tContext); + } + + @Override + public void testMapFileOutputCommitter() throws Exception { + skip("Partioning committer is not suitable for Map Output"); + } + + /** + * The class provides a overridden implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + private static final class CommitterWithFailedThenSucceed extends + DirectoryStagingCommitter implements CommitterFaultInjection { + private final CommitterFaultInjectionImpl injection; + + CommitterWithFailedThenSucceed(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + injection = new CommitterFaultInjectionImpl(outputPath, context, true); + } + + @Override + public void setupJob(JobContext context) throws IOException { + injection.setupJob(context); + super.setupJob(context); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) + throws IOException { + injection.abortJob(context, state); + super.abortJob(context, state); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext context) throws IOException { + injection.cleanupJob(context); + super.cleanupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + injection.setupTask(context); + super.setupTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + injection.commitTask(context); + super.commitTask(context); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + injection.abortTask(context); + super.abortTask(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + injection.commitJob(context); + super.commitJob(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + injection.needsTaskCommit(context); + return super.needsTaskCommit(context); + } + + @Override + public void setFaults(CommitterFaultInjection.Faults... faults) { + injection.setFaults(faults); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java new file mode 100644 index 0000000..08c572e --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.staging.integration; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory; +import org.apache.hadoop.fs.s3a.S3ClientFactory; +import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol; +import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection; +import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl; +import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants; +import org.apache.hadoop.fs.s3a.commit.staging.Paths; +import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; +import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*; + +/** Test the staging committer's handling of the base protocol operations. */ +public class ITestStagingCommitProtocol extends AbstractITCommitProtocol { + + @Override + protected String suitename() { + return "ITestStagingCommitProtocol"; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setInt(FS_S3A_COMMITTER_THREADS, 1); + // switch to the inconsistent filesystem + conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class, + S3ClientFactory.class); + + // disable unique filenames so that the protocol tests of FileOutputFormat + // and this test generate consistent names. + conf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, false); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + + // identify working dir for staging and delete + Configuration conf = getConfiguration(); + String uuid = StagingCommitter.getUploadUUID(conf, + getTaskAttempt0().getJobID()); + Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid, + getTaskAttempt0()); + rmdir(tempDir, conf); + } + + @Override + protected String getCommitterName() { + return InternalCommitterConstants.COMMITTER_NAME_STAGING; + } + + @Override + protected AbstractS3ACommitter createCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + return new StagingCommitter(outputPath, context); + } + + public AbstractS3ACommitter createFailingCommitter( + TaskAttemptContext tContext) throws IOException { + return new CommitterWithFailedThenSucceed(getOutDir(), tContext); + } + + @Override + protected boolean shouldExpectSuccessMarker() { + return false; + } + + @Override + protected void expectJobCommitToFail(JobContext jContext, + AbstractS3ACommitter committer) throws Exception { + expectJobCommitFailure(jContext, committer, + IOException.class); + } + + protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException { + // this is expected to be local FS + ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); + } + + protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException { + // this is expected to be local FS + ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p); + } + + protected FileSystem getLocalFS() throws IOException { + return FileSystem.getLocal(getConfiguration()); + } + + /** + * The class provides a overridden implementation of commitJobInternal which + * causes the commit failed for the first time then succeed. + */ + private static final class CommitterWithFailedThenSucceed extends + StagingCommitter implements CommitterFaultInjection { + + private final CommitterFaultInjectionImpl injection; + + CommitterWithFailedThenSucceed(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + injection = new CommitterFaultInjectionImpl(outputPath, context, true); + } + @Override + public void setupJob(JobContext context) throws IOException { + injection.setupJob(context); + super.setupJob(context); + } + + @Override + public void abortJob(JobContext context, JobStatus.State state) + throws IOException { + injection.abortJob(context, state); + super.abortJob(context, state); + } + + @Override + @SuppressWarnings("deprecation") + public void cleanupJob(JobContext context) throws IOException { + injection.cleanupJob(context); + super.cleanupJob(context); + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + injection.setupTask(context); + super.setupTask(context); + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + injection.commitTask(context); + super.commitTask(context); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + injection.abortTask(context); + super.abortTask(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + injection.commitJob(context); + super.commitJob(context); + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) + throws IOException { + injection.needsTaskCommit(context); + return super.needsTaskCommit(context); + } + + @Override + public void setFaults(Faults... faults) { + injection.setFaults(faults); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java index a33c001..22a028a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java @@ -35,11 +35,14 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.commit.CommitConstants; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE; import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -235,6 +238,26 @@ public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase { }); } + @Test + public void testProbeForMagic() throws Throwable { + S3AFileSystem fs = getFileSystem(); + String name = fs.getUri().toString(); + S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo( + getConfiguration()); + if (fs.hasCapability( + CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) { + // if the FS is magic, expect this to work + exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name); + } else { + // if the FS isn't magic, expect the probe to fail + ExitUtil.ExitException e = intercept(ExitUtil.ExitException.class, + () -> exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name)); + if (e.getExitCode() != E_BAD_STATE) { + throw e; + } + } + } + /** * Get the test CSV file; assume() that it is not modified (i.e. we haven't * switched to a new storage infrastructure where the bucket is no longer http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java index 02eb7b8..5763b83 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java @@ -537,7 +537,7 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase { } @Test - public void testDeleteTable() throws IOException { + public void testDeleteTable() throws Exception { final String tableName = "testDeleteTable"; final S3AFileSystem s3afs = getFileSystem(); final Configuration conf = s3afs.getConf(); @@ -553,7 +553,6 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase { // delete table once more; be ResourceNotFoundException swallowed silently ddbms.destroy(); verifyTableNotExist(tableName); - try { // we can no longer list the destroyed table ddbms.listChildren(new Path(S3URI)); @@ -582,13 +581,9 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase { * * This should not rely on the {@link DynamoDBMetadataStore} implementation. */ - private static void verifyTableNotExist(String tableName) { - final Table table = dynamoDB.getTable(tableName); - try { - table.describe(); - fail("Expecting ResourceNotFoundException for table '" + tableName + "'"); - } catch (ResourceNotFoundException ignored) { - } + private static void verifyTableNotExist(String tableName) throws Exception{ + intercept(ResourceNotFoundException.class, + () -> dynamoDB.getTable(tableName).describe()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org