[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=608340&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-608340
 ]

ASF GitHub Bot logged work on MAPREDUCE-7341:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/21 08:13
            Start Date: 08/Jun/21 08:13
    Worklog Time Spent: 10m 
      Work Description: mukund-thakur commented on a change in pull request 
#2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r646548513



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbortTaskStage.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_TASK_ABORT_TASK;
+
+/**
+ * Abort a task.
+ *
+ * This done just by deleting the task directory.

Review comment:
       missing is ?

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractJobCommitStage.java
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_CREATE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
+
+/**
+ * A Stage in Task/Job Commit.
+ * A stage can be executed once only, creating the return value of the
+ * {@link #apply(Object)} method, and, potentially, updating the state of the
+ * store via {@link StoreOperations}.
+ * IOStatistics will also be updated.
+ * Stages are expected to be combined to form the commit protocol.
+ * @param <R> Type of arguments to the stage.

Review comment:
       nit: It would be better if we give name like I -> input and R-> return 
type. 

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/AbstractJobCommitStage.java
##########
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_CREATE;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS;
+import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_RENAME;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE;
+
+/**
+ * A Stage in Task/Job Commit.
+ * A stage can be executed once only, creating the return value of the
+ * {@link #apply(Object)} method, and, potentially, updating the state of the
+ * store via {@link StoreOperations}.
+ * IOStatistics will also be updated.
+ * Stages are expected to be combined to form the commit protocol.
+ * @param <R> Type of arguments to the stage.

Review comment:
       Or maybe consistent with <T, R> of JobStage and FunctionRaisingIOE.

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitter.java
##########
@@ -0,0 +1,712 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import 
org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.CleanupJobStage.optionsFromConfig;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_VALIDATE_OUTPUT_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.buildJobUUID;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.createIOStatisticsStore;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterSupport.getAppAttemptId;
+
+/**
+ * This is the Intermediate-Manifest committer.
+ */
+public class ManifestCommitter extends PathOutputCommitter implements
+    IOStatisticsSource {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      ManifestCommitter.class);
+
+  public static final String TASK_COMMITTER = "task committer";
+
+  public static final String JOB_COMMITTER = "job committer";
+
+  /**
+   * Committer Configuration as extracted from
+   * the job/task context and set in the constructor.
+   *
+   */
+  private final ManifestCommitterConfig baseConfig;
+
+  /**
+   * Destination of the job.
+   */
+  private final Path destinationDir;
+
+  /**
+   * For tasks, the attempt directory.
+   * Null for jobs.
+   */
+  private final Path taskAttemptDir;
+
+  /**
+   * IOStatistics to update.
+   */
+  private final IOStatisticsStore iostatistics;
+
+  /**
+   *  The job Manifest Success data; only valid after a job successfully
+   *  commits.
+   */
+  private ManifestSuccessData jobSuccessData;
+
+  /**
+   * The task manifest of the task commit.
+   * Null unless this is a task attempt and the
+   * task has successfully been committed.
+   */
+  private TaskManifest taskAttemptCommittedManifest;
+
+  /**
+   * Create a committer.
+   * @param outputPath output path
+   * @param context job/task context
+   * @throws IOException failure.
+   */
+  ManifestCommitter(final Path outputPath,
+      final TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    destinationDir = resolveDestinationDirectory(outputPath,
+        context.getConfiguration());
+    iostatistics = createIOStatisticsStore().build();
+    baseConfig = new ManifestCommitterConfig(destinationDir,
+        TASK_COMMITTER,
+        context,
+        iostatistics);
+    taskAttemptDir = baseConfig.getTaskAttemptDir();
+  }
+
+  /**
+   * Create a committer config from the passed in job/task context.
+   * @param isTask is this a task?
+   * @param context context
+   * @return committer config
+   */
+  private ManifestCommitterConfig createCommitterConfig(boolean isTask,
+      JobContext context) {
+    return new ManifestCommitterConfig(
+        getOutputPath(),
+        isTask ? TASK_COMMITTER : JOB_COMMITTER,
+        context,
+        iostatistics);
+  }
+
+  /**
+   * Set up a job through a {@link SetupJobStage}.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException IO Failure.
+   */
+  public void setupJob(final JobContext jobContext) throws IOException {
+    ManifestCommitterConfig committerConfig = createCommitterConfig(false,
+        jobContext);
+    StageConfig stageConfig =
+        committerConfig
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // set up the job.
+    new SetupJobStage(stageConfig)
+        .apply(committerConfig.getCreateJobMarker());
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Set up a task through a {@link SetupTaskStage}.
+   *
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void setupTask(final TaskAttemptContext context)
+      throws IOException {
+    StageConfig stageConfig =
+        createCommitterConfig(true, context)
+            .createJobStageConfig()
+            .withOperations(createStoreOperations())
+            .build();
+    // create task attempt dir; delete if present. Or fail?
+    new SetupTaskStage(stageConfig).apply("");
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Always return true.
+   * @param context task context.
+   * @return true
+   * @throws IOException IO Failure.
+   */
+  public boolean needsTaskCommit(final TaskAttemptContext context)
+      throws IOException {
+    return true;
+  }
+
+  /**
+   * Failure during Job Commit is not recoverable from.
+   *
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isCommitJobRepeatable(final JobContext jobContext)
+      throws IOException {
+    return false;
+  }
+
+  /**
+   * Declare that task recovery is not supported.
+   * It would be, if someone added the code *and tests*.
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return false, always
+   * @throws IOException never
+   */
+  @Override
+  public boolean isRecoverySupported(final JobContext jobContext)
+      throws IOException {
+    return false;
+  }
+
+  /**
+   *
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException always
+   */
+  @Override
+  public void recoverTask(final TaskAttemptContext taskContext)
+      throws IOException {
+    LOG.warn("Rejecting recoverTask({}) call", taskContext.getTaskAttemptID());
+    throw new IOException("Cannot recover task "
+        + taskContext.getTaskAttemptID());
+  }
+
+  /**
+   * Commit the task.
+   * This is where the task attempt tree list takes place.
+   * @param context task context.
+   * @throws IOException IO Failure.
+   */
+  public void commitTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = createCommitterConfig(true,
+        context);
+    StageConfig stageConfig = committerConfig.createJobStageConfig()
+        .withOperations(createStoreOperations())
+        .build();
+    taskAttemptCommittedManifest = new CommitTaskStage(stageConfig)
+        .apply("generate")
+        .getRight();
+    iostatistics.incrementCounter(COMMITTER_TASKS_COMPLETED, 1);
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * Abort a task.
+   * @param context task context
+   * @throws IOException failure during the delete
+   */
+  public void abortTask(final TaskAttemptContext context)
+      throws IOException {
+    ManifestCommitterConfig committerConfig = createCommitterConfig(true,
+        context);
+    new AbortTaskStage(committerConfig.createJobStageConfig()
+        .withOperations(createStoreOperations())
+        .build())
+        .apply(false);
+    logCommitterStatisticsAtDebug();
+  }
+
+  /**
+   * This is the big job commit stage.
+   * Load the manifests, prepare the destination, rename
+   * the files then cleanup the job directory.
+   * @param jobContext Context of the job whose output is being written.
+   * @throws IOException failure.
+   */
+  @Override
+  public void commitJob(final JobContext jobContext) throws IOException {
+
+    ManifestCommitterConfig committerConfig = createCommitterConfig(false,
+        jobContext);
+
+    try (CloseableTaskSubmitter ioProcs =
+             committerConfig.createSubmitter()) {
+      // the stage config will be shared across all stages.
+      StageConfig stageConfig = committerConfig.createJobStageConfig()
+          .withOperations(createStoreOperations())
+          .withIOProcessors(ioProcs)

Review comment:
       Ioprocs are shared accross the whole CommitJobStage which includes 
LoadManifest and Rename. But in the docs it is written that, threadpool 
shouldn't be shared to avoid deadlocks. 

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/CommitJobStage.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+
+import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_JOB_COMMITTED_BYTES;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_JOB_COMMITTED_FILES;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
+
+/**
+ * Commit the Job.
+ * Arguments (save manifest, validate output)
+ */

Review comment:
       It will be easier to understand if we define the input and output 
arguments of each stage. For example, in this stage I understand that we are 
returing ManifestSuccessData but why List<path>?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 608340)
    Time Spent: 3h 40m  (was: 3.5h)

> Add a task-manifest output committer for Azure and GCS
> ------------------------------------------------------
>
>                 Key: MAPREDUCE-7341
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: client
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Add a task-manifest output committer for Azure and GCS
> The S3A committers are very popular in Spark on S3, as they are both correct 
> and fast.
> The classic FileOutputCommitter v1 and v2 algorithms are all that is 
> available for Azure ABFS and Google GCS, and they have limitations. 
> The v2 algorithm isn't safe in the presence of failed task attempt commits, 
> so we
> recommend the v1 algorithm for Azure. But that is slow because it 
> sequentially lists
> then renames files and directories, one-by-one. The latencies of list
> and rename make things slow.
> Google GCS lacks the atomic directory rename required for v1 correctness;
> v2 can be used (which doesn't have the job commit performance limitations),
> but it's not safe.
> Proposed
> * Add a new FileOutputFormat committer which uses an intermediate manifest to
>   pass the list of files created by a TA to the job committer.
> * Job committer to parallelise reading these task manifests and submit all the
>   rename operations into a pool of worker threads. (also: mkdir, directory 
> deletions on cleanup)
> * Use the committer plugin mechanism added for s3a to make this the default 
> committer for ABFS
>   (i.e. no need to make any changes to FileOutputCommitter)
> * Add lots of IOStatistics instrumentation + logging of operations in the 
> JobCommit
>   for visibility of where delays are occurring.
> * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other 
> data
>   for testing/support.  
> This committer will be faster than the V1 algorithm because of the 
> parallelisation, and
> because a manifest written by create-and-rename will be exclusive to a single 
> task
> attempt, delivers the isolation which the v2 committer lacks.
> This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only 
> format
> for describing the contents of a table; the final output is still a directory 
> tree
> which must be scanned during query planning.
> As such the format is still suboptimal for cloud storage -but at least we 
> will have
> faster job execution during the commit phases.
>   
> Note: this will also work on HDFS, where again, it should be faster than
> the v1 committer. However the target is very much Spark with ABFS and GCS; no 
> plans to worry about MR as that simplifies the challenge of dealing with job 
> restart (i.e. you don't have to)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to