steveloughran commented on code in PR #6468: URL: https://github.com/apache/hadoop/pull/6468#discussion_r1476480590
########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker; Review Comment: move to same group as rest of apache imports ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; Review Comment: import ordering and grouping. best to set the ide for the code style -but don't rearrange imports on existing classes as it ruins cherrypicking ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java: ########## @@ -248,6 +236,80 @@ private PendingSet innerCommitTask( return pendingSet; } + /** + * Loads pending commits from either memory or from the remote store (S3) based on the config. + * @param context TaskAttemptContext + * @return All pending commit data for the given TaskAttemptContext + * @throws IOException + * if there is an error trying to read the commit data + */ + protected PendingSet loadPendingCommits(TaskAttemptContext context) throws IOException { + PendingSet pendingSet = new PendingSet(); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + // load from memory + List<SinglePendingCommit> pendingCommits = loadPendingCommitsFromMemory(context); + + for (SinglePendingCommit singleCommit : pendingCommits) { + // aggregate stats + pendingSet.getIOStatistics() + .aggregate(singleCommit.getIOStatistics()); + // then clear so they aren't marshalled again. + singleCommit.getIOStatistics().clear(); + } + pendingSet.setCommits(pendingCommits); + } else { + // Load from remote store + CommitOperations actions = getCommitOperations(); + Path taskAttemptPath = getTaskAttemptPath(context); + try (CommitContext commitContext = initiateTaskOperation(context)) { + Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded = + actions.loadSinglePendingCommits(taskAttemptPath, true, commitContext); + pendingSet = loaded.getKey(); + List<Pair<LocatedFileStatus, IOException>> failures = loaded.getValue(); + if (!failures.isEmpty()) { + // At least one file failed to load + // revert all which did; report failure with first exception + LOG.error("At least one commit file could not be read: failing"); + abortPendingUploads(commitContext, pendingSet.getCommits(), true); + throw failures.get(0).getValue(); + } + } + } + return pendingSet; + } + + private List<SinglePendingCommit> loadPendingCommitsFromMemory(TaskAttemptContext context) Review Comment: nit, javadocs ########## hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java: ########## @@ -71,6 +79,26 @@ public void setup() throws Exception { CommitUtils.verifyIsMagicCommitFS(getFileSystem()); } + @Parameterized.Parameters(name = "track-commit-in-memory-{0}") + public static Collection<Object[]> params() { + return Arrays.asList(new Object[][]{ + {false}, + {true} + }); + } + + public ITestMagicCommitProtocol(boolean trackCommitsInMemory) { + this.trackCommitsInMemory = trackCommitsInMemory; + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, trackCommitsInMemory); Review Comment: look for uses of `removeBaseAndBucketOverrides()` to see how to avoid per-bucket settings breaking your test. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java: ########## @@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode) @Retries.RetryTranslated public FileStatus getFileStatus(final Path f) throws IOException { Path path = qualify(f); + if (isTrackMagicCommitsInMemoryEnabled(getConf()) && isMagicCommitPath(path)) { Review Comment: this is a bit of a hack. not saying that's bad, just wondering if there is a more elegant solution. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.commit.magic; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.WriteOperationHelper; +import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; +import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import software.amazon.awssdk.services.s3.model.CompletedPart; Review Comment: review import ordering and grouping. ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java: ########## @@ -264,9 +326,14 @@ public void abortTask(TaskAttemptContext context) throws IOException { try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID()); CommitContext commitContext = initiateTaskOperation(context)) { - getCommitOperations().abortAllSinglePendingCommits(attemptPath, - commitContext, - true); + if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) { + List<SinglePendingCommit> pendingCommits = loadPendingCommitsFromMemory(context); + for (SinglePendingCommit singleCommit : pendingCommits) { + commitContext.abortSingleCommit(singleCommit); + } + } else { + getCommitOperations().abortAllSinglePendingCommits(attemptPath, commitContext, true); Review Comment: hmmm. this is trouble here as abortTask may be called from the job process rather than the task attempt. For example, TA considered failed; job instances calls abort. we can do that today because a list will find all pending uploads. with in memory, there's no longer that ability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org