[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=736785&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-736785
 ]

ASF GitHub Bot logged work on MAPREDUCE-7341:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Mar/22 16:57
            Start Date: 04/Mar/22 16:57
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r819741347



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/StoreOperations.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileOrDirEntry;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * FileSystem operations which are needed to generate the task manifest.
+ * The specific choice of which implementation to use is configurable.
+ *
+ * If a store provides a custom file commit operation then it must
+ * offer a subclass of this which returns true in
+ * {@link #storeSupportsResilientCommit()}
+ * and then implement
+ * {@link #commitFile(FileOrDirEntry)}.
+ *
+ */
+public abstract class StoreOperations implements Closeable {
+
+  /**
+   * Bind to the filesystem.
+   * This is called by the manifest committer after the operations
+   * have been instantiated.
+   * @param fileSystem target FS
+   * @param path actual path under FS.
+   * @throws IOException if there are binding problems.
+   */
+  public void bindToFileSystem(FileSystem fileSystem, Path path) throws 
IOException {
+
+  }
+
+  /**
+   * Forward to {@link FileSystem#getFileStatus(Path)}.
+   * @param path path
+   * @return status
+   * @throws IOException failure.
+   */
+  public abstract FileStatus getFileStatus(Path path) throws IOException;
+
+  /**
+   * Is a path a file? Used during directory creation.
+   * The is a copy & paste of FileSystem.isFile();
+   * {@code StoreOperationsThroughFileSystem} calls into
+   * the FS direct so that stores which optimize their probes
+   * can save on IO.
+   * @param path path to probe
+   * @return true if the path exists and resolves to a file
+   * @throws IOException failure other than FileNotFoundException
+   */
+  public boolean isFile(Path path) throws IOException {
+    try {
+      return getFileStatus(path).isFile();
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
+  /**
+   * Forward to {@link FileSystem#delete(Path, boolean)}.
+   * If it returns without an error: there is nothing at
+   * the end of the path.
+   * @param path path
+   * @param recursive recursive delete.
+   * @return true if the path was deleted.
+   * @throws IOException failure.
+   */
+  public abstract boolean delete(Path path, boolean recursive)
+      throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#mkdirs(Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param path path
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public abstract boolean mkdirs(Path path) throws IOException;
+
+  /**
+   * Forward to {@link FileSystem#rename(Path, Path)}.
+   * Usual "what does 'false' mean" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return the return value of the rename
+   * @throws IOException failure.
+   */
+  public abstract boolean renameFile(Path source, Path dest)
+      throws IOException;
+
+  /**
+   * Rename a dir; defaults to invoking
+   * Forward to {@link #renameFile(Path, Path)}.
+   * Usual "what does 'false' mean?" ambiguity.
+   * @param source source file
+   * @param dest destination path -which must not exist.
+   * @return true if the directory was created.
+   * @throws IOException failure.
+   */
+  public boolean renameDir(Path source, Path dest)
+      throws IOException {
+    return renameFile(source, dest);
+  }
+
+  /**
+   * List the directory.
+   * @param path path to list.
+   * @return an iterator over the results.
+   * @throws IOException any immediate failure.
+   */
+  public abstract RemoteIterator<FileStatus> listStatusIterator(Path path)
+      throws IOException;
+
+  /**
+   * Load a task manifest from the store.
+   * with a real FS, this is done with
+   * {@link TaskManifest#load(JsonSerialization, FileSystem, Path, FileStatus)}
+   *
+   * @param serializer serializer.
+   * @param st status with the path and other data.
+   * @return the manifest
+   * @throws IOException failure to load/parse
+   */
+  public abstract TaskManifest loadTaskManifest(
+      JsonSerialization<TaskManifest> serializer,
+      FileStatus st) throws IOException;
+
+  /**
+   * Save a task manifest by create(); there's no attempt at
+   * renaming anything here.
+   * @param manifestData the manifest/success file
+   * @param path temp path for the initial save
+   * @param overwrite should create(overwrite=true) be used?
+   * @throws IOException failure to load/parse
+   */
+  public abstract <T extends AbstractManifestData<T>> void save(
+      T manifestData,
+      Path path,
+      boolean overwrite) throws IOException;
+
+  /**
+   * Is trash enabled for the given store.
+   * @param path path to move, assumed to be _temporary
+   * @return true iff trash is enabled.
+   */
+  public abstract boolean isTrashEnabled(Path path);
+
+  /**
+   * Make an msync() call; swallow when unsupported.
+   * @param path path
+   * @throws IOException IO failure
+   */
+  public abstract void msync(Path path) throws IOException;
+
+  /**
+   * Extract an etag from a status if the conditions are met.
+   * If the conditions are not met, return null or ""; they will
+   * both be treated as "no etags available"
+   * <pre>
+   *   1. The status is of a type which the implementation recognizes
+   *   as containing an etag.
+   *   2. After casting the etag field can be retrieved
+   *   3. and that value is non-null/non-empty.
+   * </pre>
+   * @param status status, which may be null of any subclass of FileStatus.
+   * @return either a valid etag, or null or "".
+   */
+  public String getEtag(FileStatus status) {
+    return ManifestCommitterSupport.getEtag(status);
+  }
+
+  /**
+   * Does the store preserve etags through renames.
+   * If true, and if the source listing entry has an etag,
+   * it will be used to attempt to validate a failed rename.
+   * @param path path to probe.
+   * @return true if etag comparison is a valid strategy.
+   */
+  public boolean storePreservesEtagsThroughRenames(Path path) {
+    return false;
+  }
+
+  /**
+   * Does the store provide rename resilience through an
+   * implementation of {@link #commitFile(FileOrDirEntry)}?
+   * If true then that method will be invoked to commit work
+   * @return true if resilient commit support is available.
+   */
+  public boolean storeSupportsResilientCommit() {
+    return false;
+  }
+
+  /**
+   * Commit one file through any resilient API.
+   * This operation MUST rename source to destination,
+   * else raise an exception.
+   * The result indicates whether or not some
+   * form of recovery took place.
+   *
+   * If etags were collected during task commit, these will be
+   * in the entries passed in here.
+   * @param entry entry to commit
+   * @return the result of the commit
+   * @throws IOException failure.
+   * @throws UnsupportedOperationException if not available.
+   *
+   */
+  public CommitFileResult commitFile(FileOrDirEntry entry) throws IOException {
+    throw new UnsupportedOperationException("Resilient commit not supported");
+  }
+
+  /**
+   * Outcome from the operation {@link #commitFile(FileOrDirEntry)}.
+   * As a rename failure MUST raise an exception, this result
+   * only declares whether or not some form of recovery took place.
+   */
+  public static final class CommitFileResult {
+
+    /** Did recovery take place? */
+    private final boolean recovered;
+
+    /**
+     * Full commit result.
+     * @param recovered Did recovery take place?
+     */
+    public static CommitFileResult fromResilientCommit(final boolean 
recovered) {
+      return new CommitFileResult(recovered);
+    }
+
+    /**
+     * Full commit result.
+     * @param recovered Did recovery take place?
+     *
+     */
+    private CommitFileResult(
+        final boolean recovered) {
+      this.recovered = recovered;
+    }
+
+    /**
+     * Did some form of recovery take place?
+     * @return true if the commit succeeded through some form of (etag-based) 
recovery
+     */
+    public boolean recovered() {
+      return recovered;
+    }
+
+  }
+
+  /**
+   * Move a directory to trash, with the jobID as its name.
+   * IOExceptions are caught and included in the outcome.
+   * @param jobId job ID.
+   * @param path path to move, assumed to be _temporary
+   * @return the outcome.
+   */
+  public abstract MoveToTrashResult moveToTrash(String jobId, Path path);

Review comment:
       its actually not directly part of the fs api. bit of a mess really

##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/impl/UnreliableStoreOperations.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.OPERATION_TIMED_OUT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.StoreOperationsThroughFileSystem.E_TRASH_FALSE;
+
+/**
+ * Wrap an existing StoreOperations implementation and fail on
+ * specific paths.
+ * This is for testing. It could be implemented via
+ * Mockito 2 spy code but is not so that:
+ * 1. It can be backported to Hadoop versions using Mockito 1.x.
+ * 2. It can be extended to use in production. This is why it is in
+ * the production module -to allow for downstream tests to adopt it.

Review comment:
       made clear they mustn't




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 736785)
    Time Spent: 24.5h  (was: 24h 20m)

> Add a task-manifest output committer for Azure and GCS
> ------------------------------------------------------
>
>                 Key: MAPREDUCE-7341
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: client
>    Affects Versions: 3.3.1
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 24.5h
>  Remaining Estimate: 0h
>
> Add a task-manifest output committer for Azure and GCS
> The S3A committers are very popular in Spark on S3, as they are both correct 
> and fast.
> The classic FileOutputCommitter v1 and v2 algorithms are all that is 
> available for Azure ABFS and Google GCS, and they have limitations. 
> The v2 algorithm isn't safe in the presence of failed task attempt commits, 
> so we
> recommend the v1 algorithm for Azure. But that is slow because it 
> sequentially lists
> then renames files and directories, one-by-one. The latencies of list
> and rename make things slow.
> Google GCS lacks the atomic directory rename required for v1 correctness;
> v2 can be used (which doesn't have the job commit performance limitations),
> but it's not safe.
> Proposed
> * Add a new FileOutputFormat committer which uses an intermediate manifest to
>   pass the list of files created by a TA to the job committer.
> * Job committer to parallelise reading these task manifests and submit all the
>   rename operations into a pool of worker threads. (also: mkdir, directory 
> deletions on cleanup)
> * Use the committer plugin mechanism added for s3a to make this the default 
> committer for ABFS
>   (i.e. no need to make any changes to FileOutputCommitter)
> * Add lots of IOStatistics instrumentation + logging of operations in the 
> JobCommit
>   for visibility of where delays are occurring.
> * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other 
> data
>   for testing/support.  
> This committer will be faster than the V1 algorithm because of the 
> parallelisation, and
> because a manifest written by create-and-rename will be exclusive to a single 
> task
> attempt, delivers the isolation which the v2 committer lacks.
> This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only 
> format
> for describing the contents of a table; the final output is still a directory 
> tree
> which must be scanned during query planning.
> As such the format is still suboptimal for cloud storage -but at least we 
> will have
> faster job execution during the commit phases.
>   
> Note: this will also work on HDFS, where again, it should be faster than
> the v1 committer. However the target is very much Spark with ABFS and GCS; no 
> plans to worry about MR as that simplifies the challenge of dealing with job 
> restart (i.e. you don't have to)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to