[ https://issues.apache.org/jira/browse/MAPREDUCE-7341?focusedWorklogId=742269&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-742269 ]
ASF GitHub Bot logged work on MAPREDUCE-7341: --------------------------------------------- Author: ASF GitHub Bot Created on: 16/Mar/22 12:35 Start Date: 16/Mar/22 12:35 Worklog Time Spent: 10m Work Description: steveloughran commented on a change in pull request #2971: URL: https://github.com/apache/hadoop/pull/2971#discussion_r827957630 ########## File path: hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobCommitStage.java ########## @@ -0,0 +1,942 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.DurationTracker; +import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest; +import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations; +import org.apache.hadoop.util.OperationDuration; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.hadoop.util.functional.TaskPool; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_DELETE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_GET_FILE_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_IS_FILE; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_LIST_STATUS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OP_MKDIRS; +import static org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_IO_RATE_LIMITED; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.createTracker; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; +import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_SUFFIX; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME_RECOVERED; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_MSYNC; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_RENAME_FILE; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_SAVE_TASK_MANIFEST; +import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration.enterStageWorker; + +/** + * A Stage in Task/Job Commit. + * A stage can be executed once only, creating the return value of the + * {@link #apply(Object)} method, and, potentially, updating the state of the + * store via {@link ManifestStoreOperations}. + * IOStatistics will also be updated. + * Stages are expected to be combined to form the commit protocol. + * @param <IN> Type of arguments to the stage. + * @param <OUT> Type of result. + */ +public abstract class AbstractJobCommitStage<IN, OUT> + implements JobStage<IN, OUT> { + + private static final Logger LOG = LoggerFactory.getLogger( + AbstractJobCommitStage.class); + + /** + * Error text on rename failure: {@value}. + */ + public static final String FAILED_TO_RENAME_PREFIX = "Failed to "; + + /** + * Is this a task stage? If so, toString() includes task + * info.. + */ + private final boolean isTaskStage; + + /** + * Configuration of all the stages in the ongoing committer + * operation. + */ + private final StageConfig stageConfig; + + /** + * Name of the stage for statistics and logging. + */ + private final String stageStatisticName; + + /** + * Callbacks to update store. + * This is not made visible to the stages; they must + * go through the wrapper classes in this class, which + * add statistics and logging. + */ + private final ManifestStoreOperations operations; + + /** + * Submitter for doing IO against the store. + */ + private final TaskPool.Submitter ioProcessors; + + /** + * Used to stop any re-entrancy of the rename. + * This is an execute-once operation. + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + /** + * Tracker of the duration of the execution of the stage. + * set after {@link #executeStage(Object)} completes. + */ + private DurationTracker stageExecutionTracker; + + /** + * Name for logging. + */ + private final String name; + + /** + * Constructor. + * @param isTaskStage Is this a task stage? + * @param stageConfig stage-independent configuration. + * @param stageStatisticName name of the stage for statistics/logging + * @param requireIOProcessors are the IO processors required? + */ + protected AbstractJobCommitStage( + final boolean isTaskStage, + final StageConfig stageConfig, + final String stageStatisticName, + final boolean requireIOProcessors) { + this.isTaskStage = isTaskStage; + this.stageStatisticName = stageStatisticName; + this.stageConfig = stageConfig; + requireNonNull(stageConfig.getDestinationDir(), "Destination Directory"); + requireNonNull(stageConfig.getJobId(), "Job ID"); + requireNonNull(stageConfig.getJobAttemptDir(), "Job attempt directory"); + this.operations = requireNonNull(stageConfig.getOperations(), + "Operations callbacks"); + // and the processors of work if required. + this.ioProcessors = bindProcessor( + requireIOProcessors, + stageConfig.getIoProcessors()); + String stageName; + if (isTaskStage) { + // force fast failure. + getRequiredTaskId(); + getRequiredTaskAttemptId(); + getRequiredTaskAttemptDir(); + stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId()); + } else { + stageName = String.format("[Job-Attempt %s/%02d]", + stageConfig.getJobId(), + stageConfig.getJobAttemptNumber()); + } + name = stageName; + } + + /** + * Bind to the processor if it is required. + * @param required is the processor required? + * @param processor processor + * @return the processor binding + * @throws NullPointerException if required == true and processor is null. + */ + private TaskPool.Submitter bindProcessor( + final boolean required, + final TaskPool.Submitter processor) { + return required + ? requireNonNull(processor, "required IO processor is null") + : null; + } + + /** + * Stage entry point. + * Verifies that this is the first and only time the stage is invoked, + * then calls {@link #executeStage(Object)} for the subclass + * to perform its part of the commit protocol. + * The duration of the stage is collected as a statistic, and its + * entry/exit logged at INFO. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + @Override + public final OUT apply(final IN arguments) throws IOException { + executeOnlyOnce(); + progress(); + String stageName = getStageName(arguments); + getStageConfig().enterStage(stageName); + String statisticName = getStageStatisticName(arguments); + // The tracker here + LOG.info("{}: Executing Stage {}", getName(), stageName); + stageExecutionTracker = createTracker(getIOStatistics(), statisticName); + try { + // exec the input function and return its value + final OUT out = executeStage(arguments); + LOG.info("{}: Stage {} completed after {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis())); + return out; + } catch (IOException | RuntimeException e) { + LOG.error("{}: Stage {} failed: after {}: {}", + getName(), + stageName, + OperationDuration.humanTime( + stageExecutionTracker.asDuration().toMillis()), + e.toString()); + LOG.debug("{}: Stage failure:", getName(), e); + // input function failed: note it + stageExecutionTracker.failed(); + // and rethrow + throw e; + } finally { + // update the tracker. + // this is called after the catch() call would have + // set the failed flag. + stageExecutionTracker.close(); + progress(); + getStageConfig().exitStage(stageName); + } + } + + /** + * The work of a stage. + * Executed exactly once. + * @param arguments arguments to the function. + * @return the result. + * @throws IOException failures. + */ + protected abstract OUT executeStage(IN arguments) throws IOException; + + /** + * Check that the operation has not been invoked twice. + * This is an atomic check. + * @throws IllegalStateException on a second invocation. + */ + private void executeOnlyOnce() { + Preconditions.checkState( + !executed.getAndSet(true), + "Stage attempted twice"); + } + + /** + * The stage statistic name. + * @param arguments args to the invocation. + * @return stage name. + */ + protected String getStageStatisticName(IN arguments) { + return stageStatisticName; + } + + /** + * Stage name for reporting; defaults to + * call {@link #getStageStatisticName(IN)}. + * @param arguments args to the invocation. + * @return name used in updating reports. + */ + protected String getStageName(IN arguments) { + return getStageStatisticName(arguments); + } + + /** + * Get the execution tracker; non-null + * after stage execution. + * @return a tracker or null. + */ + public DurationTracker getStageExecutionTracker() { + return stageExecutionTracker; + } + + /** + * Adds the duration of the job to an IOStatistics store + * (such as the manifest to be saved). + * @param iostats store + * @param statistic statistic name. + */ + public void addExecutionDurationToStatistics(IOStatisticsStore iostats, + String statistic) { + iostats.addTimedOperation( + statistic, + getStageExecutionTracker().asDuration()); + } + + /** + * Note any rate limiting to the given timing statistic. + * If the wait was 0, no statistics are updated. + * @param statistic statistic key. + * @param wait wait duration. + */ + private void noteAnyRateLimiting(String statistic, Duration wait) { + if (!wait.isZero()) { + // rate limiting took place + getIOStatistics().addTimedOperation( + statistic, + wait.toMillis()); + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "AbstractJobCommitStage{"); + sb.append(isTaskStage ? "Task Stage" : "Job Stage"); + sb.append(" name='").append(name).append('\''); + sb.append(" stage='").append(stageStatisticName).append('\''); + sb.append('}'); + return sb.toString(); + } + + /** + * The stage configuration. + * @return the stage configuration used by this stage. + */ + protected StageConfig getStageConfig() { + return stageConfig; + } + + /** + * Update the thread context with the stage name and + * job ID. + * This MUST be invoked at the start of methods invoked in helper threads, + * to ensure that they are all annotated with job and stage. + * @param stage stage name. + */ + protected void updateAuditContext(final String stage) { + enterStageWorker(stageConfig.getJobId(), stage); + } + + /** + * The IOStatistics are shared across all uses of the + * StageConfig. + * @return the (possibly shared) IOStatistics. + */ + @Override + public final IOStatisticsStore getIOStatistics() { + return stageConfig.getIOStatistics(); + } + + /** + * Call progress() on any Progressable passed in. + */ + protected final void progress() { + if (stageConfig.getProgressable() != null) { + stageConfig.getProgressable().progress(); + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatusOrNull( + final Path path) + throws IOException { + try { + return getFileStatus(path); + } catch (FileNotFoundException e) { + return null; + } + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return status or null + * @throws IOException IO Failure. + */ + protected final FileStatus getFileStatus( + final Path path) + throws IOException { + LOG.trace("{}: getFileStatus('{}')", getName(), path); + requireNonNull(path, + () -> String.format("%s: Null path for getFileStatus() call", getName())); + return trackDuration(getIOStatistics(), OP_GET_FILE_STATUS, () -> + operations.getFileStatus(path)); + } + + /** + * Get a file status value or, if the path doesn't exist, return null. + * @param path path + * @return true if the path resolves to a file + * @throws IOException IO Failure. + */ + protected final boolean isFile( + final Path path) + throws IOException { + LOG.trace("{}: isFile('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_IS_FILE, () -> { + return operations.isFile(path); + }); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @return status or null + * @throws IOException IO Failure. + */ + protected final boolean delete( + final Path path, + final boolean recursive) + throws IOException { + LOG.trace("{}: delete('{}, {}')", getName(), path, recursive); + return delete(path, recursive, OP_DELETE); + } + + /** + * Delete a path. + * @param path path + * @param recursive recursive delete. + * @param statistic statistic to update + * @return status or null + * @throws IOException IO Failure. + */ + protected Boolean delete( + final Path path, + final boolean recursive, + final String statistic) + throws IOException { + return trackDuration(getIOStatistics(), statistic, () -> { + return operations.delete(path, recursive); + }); + } + + /** + * Create a directory. + * @param path path + * @param escalateFailure escalate "false" to PathIOE + * @return true if the directory was created/exists. + * @throws IOException IO Failure. + */ + protected final boolean mkdirs( + final Path path, + final boolean escalateFailure) + throws IOException { + LOG.trace("{}: mkdirs('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_MKDIRS, () -> { + boolean success = operations.mkdirs(path); + if (!success && escalateFailure) { + throw new PathIOException(path.toUri().toString(), + stageStatisticName + ": mkdirs() returned false"); + } + return success; + }); + + } + + /** + * List all directly files under a path. + * Async implementations may under-report their durations. + * @param path path + * @return iterator over the results. + * @throws IOException IO Failure. + */ + protected final RemoteIterator<FileStatus> listStatusIterator( + final Path path) + throws IOException { + LOG.trace("{}: listStatusIterator('{}')", getName(), path); + return trackDuration(getIOStatistics(), OP_LIST_STATUS, () -> + operations.listStatusIterator(path)); + } + + /** + * Load a manifest file. + * @param status source. + * @return the manifest. + * @throws IOException IO Failure. + */ + protected final TaskManifest loadManifest( + final FileStatus status) + throws IOException { + LOG.trace("{}: loadManifest('{}')", getName(), status); + return trackDuration(getIOStatistics(), OP_LOAD_MANIFEST, () -> + operations.loadTaskManifest( + stageConfig.currentManifestSerializer(), + status)); + } + + /** + * List all the manifests in the task manifest dir. + * @return a iterator of manifests. + * @throws IOException IO Failure. + */ + protected final RemoteIterator<FileStatus> listManifests() + throws IOException { + return RemoteIterators.filteringRemoteIterator( + listStatusIterator(getTaskManifestDir()), + st -> st.getPath().toUri().toString().endsWith(MANIFEST_SUFFIX)); + } + + /** + * Make an msync() call; swallow when unsupported. + * @param path path + * @throws IOException IO failure + */ + protected final void msync(Path path) throws IOException { + LOG.trace("{}: msync('{}')", getName(), path); + trackDurationOfInvocation(getIOStatistics(), OP_MSYNC, () -> + operations.msync(path)); + } + + /** + * Create a directory -failing if it exists or if + * mkdirs() failed. + * @param operation operation for error reporting. + * @param path path path to create. + * @return the path. + * @throws IOException failure + * @throws PathIOException mkdirs failed. + * @throws FileAlreadyExistsException destination exists. + */ + protected final Path createNewDirectory( + final String operation, + final Path path) throws IOException { + LOG.trace("{}: {} createNewDirectory('{}')", getName(), operation, path); + requireNonNull(path, + () -> String.format("%s: Null path for operation %s", getName(), operation)); + // check for dir existence before trying to create. + try { + final FileStatus status = getFileStatus(path); + // no exception, so the path exists. + throw new FileAlreadyExistsException(operation + + ": path " + path + + " already exists and has status " + status); + } catch (FileNotFoundException e) { + // the path does not exist, so create it. + mkdirs(path, true); + return path; + } + } + + /** + * Assert that a path is a directory which must exist. + * @param operation operation for error reporting. + * @param path path path to create. + * @return the path + * @throws IOException failure + * @throws PathIOException mkdirs failed. + * @throws FileAlreadyExistsException destination exists. + */ + protected final Path directoryMustExist( + final String operation, + final Path path) throws IOException { + final FileStatus status = getFileStatus(path); + if (!status.isDirectory()) { + throw new PathIOException(path.toString(), + operation + + ": Path is not a directory; its status is :" + status); + } + return path; + } + + /** + * Save a task manifest or summary. This will be done by + * writing to a temp path and then renaming. + * If the destination path exists: Delete it. + * @param manifestData the manifest/success file + * @param tempPath temp path for the initial save + * @param finalPath final path for rename. + * @throws IOException failure to load/parse + */ + @SuppressWarnings("unchecked") + protected final <T extends AbstractManifestData> void save(T manifestData, + final Path tempPath, + final Path finalPath) throws IOException { + LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, finalPath); + trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () -> + operations.save(manifestData, tempPath, true)); + renameFile(tempPath, finalPath); + } + + /** + * Get an etag from a FileStatus which MUST BE + * an implementation of EtagSource and + * whose etag MUST NOT BE null/empty. + * @param status the status; may be null. + * @return the etag or null if not provided + */ + public String getEtag(FileStatus status) { + return operations.getEtag(status); + } + + /** + * Rename a file from source to dest; if the underlying FS API call + * returned false that's escalated to an IOE. + * @param source source file. + * @param dest dest file + * @throws IOException failure + * @throws PathIOException if the rename() call returned false. + */ + protected final void renameFile(final Path source, final Path dest) + throws IOException { + maybeDeleteDest(true, dest); + executeRenamingOperation("renameFile", source, dest, + OP_RENAME_FILE, () -> + operations.renameFile(source, dest)); + } + + /** + * Rename a file from source to dest; if the underlying FS API call + * returned false that's escalated to an IOE. + * @param source source file. + * @param dest dest file + * @throws IOException failure + * @throws PathIOException if the rename() call returned false. + */ + protected final void renameDir(final Path source, final Path dest) + throws IOException { + + maybeDeleteDest(true, dest); + executeRenamingOperation("renameDir", source, dest, + OP_RENAME_FILE, () -> + operations.renameDir(source, dest) + ); + } + + /** + * Commit a file from the manifest using rename or, if available, resilient renaming. + * @param entry entry from manifest + * @throws PathIOException if the rename() call returned false and was uprated. + * @throws IOException failure + */ + protected final CommitOutcome commitFile(FileEntry entry, + boolean deleteDest) + throws IOException { + + final Path source = entry.getSourcePath(); + final Path dest = entry.getDestPath(); + + maybeDeleteDest(deleteDest, dest); + if (storeSupportsResilientCommit()) { + // get the commit permits + final ManifestStoreOperations.CommitFileResult result = trackDuration(getIOStatistics(), + OP_COMMIT_FILE_RENAME, () -> + operations.commitFile(entry)); + if (result.recovered()) { + // recovery took place. + getIOStatistics().incrementCounter(OP_COMMIT_FILE_RENAME_RECOVERED); + } + if (result.getWaitTime() != null) { + // note any delay which took place + noteAnyRateLimiting(STORE_IO_RATE_LIMITED, result.getWaitTime()); + } + } else { + // commit with a simple rename; failures will be escalated. + executeRenamingOperation("renameFile", source, dest, + OP_COMMIT_FILE_RENAME, () -> + operations.renameFile(source, dest)); + } + return new CommitOutcome(); Review comment: leave open for others -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 742269) Time Spent: 37.5h (was: 37h 20m) > Add a task-manifest output committer for Azure and GCS > ------------------------------------------------------ > > Key: MAPREDUCE-7341 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-7341 > Project: Hadoop Map/Reduce > Issue Type: New Feature > Components: client > Affects Versions: 3.3.1 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > Time Spent: 37.5h > Remaining Estimate: 0h > > Add a task-manifest output committer for Azure and GCS > The S3A committers are very popular in Spark on S3, as they are both correct > and fast. > The classic FileOutputCommitter v1 and v2 algorithms are all that is > available for Azure ABFS and Google GCS, and they have limitations. > The v2 algorithm isn't safe in the presence of failed task attempt commits, > so we > recommend the v1 algorithm for Azure. But that is slow because it > sequentially lists > then renames files and directories, one-by-one. The latencies of list > and rename make things slow. > Google GCS lacks the atomic directory rename required for v1 correctness; > v2 can be used (which doesn't have the job commit performance limitations), > but it's not safe. > Proposed > * Add a new FileOutputFormat committer which uses an intermediate manifest to > pass the list of files created by a TA to the job committer. > * Job committer to parallelise reading these task manifests and submit all the > rename operations into a pool of worker threads. (also: mkdir, directory > deletions on cleanup) > * Use the committer plugin mechanism added for s3a to make this the default > committer for ABFS > (i.e. no need to make any changes to FileOutputCommitter) > * Add lots of IOStatistics instrumentation + logging of operations in the > JobCommit > for visibility of where delays are occurring. > * Reuse the S3A committer _SUCCESS JSON structure to publish IOStats & other > data > for testing/support. > This committer will be faster than the V1 algorithm because of the > parallelisation, and > because a manifest written by create-and-rename will be exclusive to a single > task > attempt, delivers the isolation which the v2 committer lacks. > This is not an attempt to do an iceberg/hudi/delta-lake style manifest-only > format > for describing the contents of a table; the final output is still a directory > tree > which must be scanned during query planning. > As such the format is still suboptimal for cloud storage -but at least we > will have > faster job execution during the commit phases. > > Note: this will also work on HDFS, where again, it should be faster than > the v1 committer. However the target is very much Spark with ABFS and GCS; no > plans to worry about MR as that simplifies the challenge of dealing with job > restart (i.e. you don't have to) -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org