[ 
https://issues.apache.org/jira/browse/HADOOP-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816206#comment-17816206
 ] 

ASF GitHub Bot commented on HADOOP-19047:
-----------------------------------------

steveloughran commented on code in PR #6468:
URL: https://github.com/apache/hadoop/pull/6468#discussion_r1484762085


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java:
##########
@@ -242,6 +242,13 @@ private CommitConstants() {
    */
   public static final int DEFAULT_COMMITTER_THREADS = 32;
 
+
+  public static final String 
FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED =

Review Comment:
   javadocs here and below, use {@value} for value insertion



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -3906,6 +3908,21 @@ public void access(final Path f, final FsAction mode)
   @Retries.RetryTranslated
   public FileStatus getFileStatus(final Path f) throws IOException {
     Path path = qualify(f);
+    if (isTrackMagicCommitsInMemoryEnabled(getConf()) && 
isMagicCommitPath(path)) {
+      // Some downstream apps might call getFileStatus for a magic path to get 
the file size.
+      // when commit data is stored in memory construct the dummy 
S3AFileStatus with correct
+      // file size fetched from the memory.
+      if 
(InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().containsKey(path)) 
{
+        long len = 
InMemoryMagicCommitTracker.getTaskAttemptIdToBytesWritten().get(path);
+        return new S3AFileStatus(len,

Review Comment:
   how about we add a special etag here, like "pending", declared in 
CommitterConstants. That way toString() on the status hints that it is pending 
so there's more diagnostics in list operations etc. And it could be used in 
tests. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.util.Preconditions;
+
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
+
+/**
+ * InMemoryMagicCommitTracker stores the commit data in memory.
+ * The commit data and related data stores are flushed out from
+ * the memory when the task is committed or aborted.
+ */
+public class InMemoryMagicCommitTracker extends MagicCommitTracker {
+
+  // stores taskAttemptId to commit data mapping

Review Comment:
   make javadocs



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.util.Preconditions;
+
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
+
+/**
+ * InMemoryMagicCommitTracker stores the commit data in memory.
+ * The commit data and related data stores are flushed out from
+ * the memory when the task is committed or aborted.
+ */
+public class InMemoryMagicCommitTracker extends MagicCommitTracker {

Review Comment:
   can you add a .toString() which gives a quick summary on the #of files being 
tracked this could be useful on logging



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java:
##########
@@ -118,76 +103,21 @@ public boolean outputImmediatelyVisible() {
 
   /**
    * Complete operation: generate the final commit data, put it.
-   * @param uploadId Upload ID
-   * @param parts list of parts
+   *
+   * @param uploadId     Upload ID
+   * @param parts        list of parts
    * @param bytesWritten bytes written
    * @param iostatistics nullable IO statistics
    * @return false, indicating that the commit must fail.
-   * @throws IOException any IO problem.
+   * @throws IOException              any IO problem.

Review Comment:
   revert



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java:
##########
@@ -20,17 +20,19 @@
 
 import java.util.List;
 
+import org.apache.hadoop.fs.s3a.commit.magic.InMemoryMagicCommitTracker;

Review Comment:
   goes into the hadoop block



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/InMemoryMagicCommitTracker.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.util.Preconditions;
+
+import static 
org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.extractTaskAttemptIdFromPath;
+
+/**
+ * InMemoryMagicCommitTracker stores the commit data in memory.
+ * The commit data and related data stores are flushed out from
+ * the memory when the task is committed or aborted.
+ */
+public class InMemoryMagicCommitTracker extends MagicCommitTracker {
+
+  // stores taskAttemptId to commit data mapping
+  private static Map<String, List<SinglePendingCommit>>

Review Comment:
   and make these all final. I do think they should use weak/soft references, 
somehow



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.magic;
+
+import org.apache.hadoop.conf.Configuration;

Review Comment:
   ordering





> Support InMemory Tracking Of S3A Magic Commits
> ----------------------------------------------
>
>                 Key: HADOOP-19047
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19047
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/s3
>            Reporter: Syed Shameerur Rahman
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> The following are the operations which happens within a Task when it uses S3A 
> Magic Committer. 
> *During closing of stream*
> 1. A 0-byte file with a same name of the original file is uploaded to S3 
> using PUT operation. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L152]
>  for more information. This is done so that the downstream application like 
> Spark could get the size of the file which is being written.
> 2. MultiPartUpload(MPU) metadata is uploaded to S3. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java#L176]
>  for more information.
> *During TaskCommit*
> 1. All the MPU metadata which the task wrote to S3 (There will be 'x' number 
> of metadata file in S3 if a single task writes to 'x' files) are read and 
> rewritten to S3 as a single metadata file. Refer 
> [here|https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java#L201]
>  for more information
> Since these operations happens with the Task JVM, We could optimize as well 
> as save cost by storing these information in memory when Task memory usage is 
> not a constraint. Hence the proposal here is to introduce a new MagicCommit 
> Tracker called "InMemoryMagicCommitTracker" which will store the 
> 1. Metadata of MPU in memory till the Task is committed
> 2. Store the size of the file which can be used by the downstream application 
> to get the file size before it is committed/visible to the output path.
> This optimization will save 2 PUT S3 calls, 1 LIST S3 call, and 1 GET S3 call 
> given a Task writes only 1 file.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to