steveloughran commented on code in PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#discussion_r1476480590


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -52,6 +52,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;

Review Comment:
   move to same group as rest of apache imports



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/S3MagicCommitTracker.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.commons.lang3.StringUtils;

Review Comment:
   import ordering and grouping. best to set the ide for the code style -but 
don't rearrange imports on existing classes as it ruins cherrypicking



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -248,6 +236,80 @@ private PendingSet innerCommitTask(
     return pendingSet;
   }
 
+  /**
+   * Loads pending commits from either memory or from the remote store (S3) 
based on the config.
+   * @param context TaskAttemptContext
+   * @return All pending commit data for the given TaskAttemptContext
+   * @throws IOException
+   *           if there is an error trying to read the commit data
+   */
+  protected PendingSet loadPendingCommits(TaskAttemptContext context) throws 
IOException {
+    PendingSet pendingSet = new PendingSet();
+    if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+      // load from memory
+      List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+
+      for (SinglePendingCommit singleCommit : pendingCommits) {
+        // aggregate stats
+        pendingSet.getIOStatistics()
+            .aggregate(singleCommit.getIOStatistics());
+        // then clear so they aren't marshalled again.
+        singleCommit.getIOStatistics().clear();
+      }
+      pendingSet.setCommits(pendingCommits);
+    } else {
+      // Load from remote store
+      CommitOperations actions = getCommitOperations();
+      Path taskAttemptPath = getTaskAttemptPath(context);
+      try (CommitContext commitContext = initiateTaskOperation(context)) {
+        Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded =
+            actions.loadSinglePendingCommits(taskAttemptPath, true, 
commitContext);
+        pendingSet = loaded.getKey();
+        List<Pair<LocatedFileStatus, IOException>> failures = 
loaded.getValue();
+        if (!failures.isEmpty()) {
+          // At least one file failed to load
+          // revert all which did; report failure with first exception
+          LOG.error("At least one commit file could not be read: failing");
+          abortPendingUploads(commitContext, pendingSet.getCommits(), true);
+          throw failures.get(0).getValue();
+        }
+      }
+    }
+    return pendingSet;
+  }
+
+  private List<SinglePendingCommit> 
loadPendingCommitsFromMemory(TaskAttemptContext context)

Review Comment:
   nit, javadocs



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java:
##########
@@ -71,6 +79,26 @@ public void setup() throws Exception {
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
   }
 
+  @Parameterized.Parameters(name = "track-commit-in-memory-{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {false},
+        {true}
+    });
+  }
+
+  public ITestMagicCommitProtocol(boolean trackCommitsInMemory) {
+    this.trackCommitsInMemory = trackCommitsInMemory;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED, 
trackCommitsInMemory);

Review Comment:
   look for uses of `removeBaseAndBucketOverrides()` to see how to avoid 
per-bucket settings breaking your test.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode)
   @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
     Path path = qualify(f);
+    if (isTrackMagicCommitsInMemoryEnabled(getConf()) && 
isMagicCommitPath(path)) {

Review Comment:
   this is a bit of a hack. not saying that's bad, just wondering if there is a 
more elegant solution.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import software.amazon.awssdk.services.s3.model.CompletedPart;

Review Comment:
   review import ordering and grouping. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java:
##########
@@ -264,9 +326,14 @@ public void abortTask(TaskAttemptContext context) throws 
IOException {
     try (DurationInfo d = new DurationInfo(LOG,
         "Abort task %s", context.getTaskAttemptID());
         CommitContext commitContext = initiateTaskOperation(context)) {
-      getCommitOperations().abortAllSinglePendingCommits(attemptPath,
-          commitContext,
-          true);
+      if (isTrackMagicCommitsInMemoryEnabled(context.getConfiguration())) {
+        List<SinglePendingCommit> pendingCommits = 
loadPendingCommitsFromMemory(context);
+        for (SinglePendingCommit singleCommit : pendingCommits) {
+          commitContext.abortSingleCommit(singleCommit);
+        }
+      } else {
+        getCommitOperations().abortAllSinglePendingCommits(attemptPath, 
commitContext, true);

Review Comment:
   hmmm. this is trouble here as abortTask may be called from the job process 
rather than the task attempt. For example, TA considered failed; job instances 
calls abort. we can do that today because a list will find all pending uploads. 
with in memory, there's no longer that ability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to