Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21066#discussion_r184224651 --- Diff: hadoop-cloud/src/hadoop-3/main/scala/org/apache/spark/internal/io/cloud/PathOutputCommitProtocol.scala --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.io.cloud + +import java.io.IOException + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory} + +import org.apache.spark.internal.io.{FileCommitProtocol, HadoopMapReduceCommitProtocol} +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage + +/** + * Spark Commit protocol for Path Output Committers. + * This committer will work with the `FileOutputCommitter` and subclasses. + * All implementations *must* be serializable. + * + * Rather than ask the `FileOutputFormat` for a committer, it uses the + * `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory + * API to create the committer. + * This is what [[org.apache.hadoop.mapreduce.lib.output.FileOutputFormat]] does, + * but as [[HadoopMapReduceCommitProtocol]] still uses the original + * `org.apache.hadoop.mapred.FileOutputFormat` binding + * subclasses do not do this, overrides those subclasses to using the + * factory mechanism now supported in the base class. + * + * In `setupCommitter` the factory is bonded to and the committer for + * the destination path chosen. + * + * @constructor Instantiate. dynamic partition overwrite is not supported, + * so that committers for stores which do not support rename + * will not get confused. + * @param jobId job + * @param destination destination + * @param dynamicPartitionOverwrite does the caller want support for dynamic + * partition overwrite. If so, it will be + * refused. + * @throws IOException when an unsupported dynamicPartitionOverwrite option is supplied. + */ +class PathOutputCommitProtocol( + jobId: String, + destination: String, + dynamicPartitionOverwrite: Boolean = false) + extends HadoopMapReduceCommitProtocol( + jobId, + destination, + false) with Serializable { + + @transient var committer: PathOutputCommitter = _ + + require(destination != null, "Null destination specified") + + val destPath = new Path(destination) + + logInfo(s"Instantiated committer with job ID=$jobId;" + + s" destination=$destPath;" + + s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite") + + if (dynamicPartitionOverwrite) { + // until there's explicit extensions to the PathOutputCommitProtocols + // to support the spark mechanism, it's left to the individual committer + // choice to handle partitioning. + throw new IOException("PathOutputCommitProtocol does not support dynamicPartitionOverwrite") + } + + import PathOutputCommitProtocol._ + + /** + * Set up the committer. + * This creates it by talking directly to the Hadoop factories, instead + * of the V1 `mapred.FileOutputFormat` methods. + * @param context task attempt + * @return the committer to use. This will always be a subclass of + * [[PathOutputCommitter]]. + */ + override protected def setupCommitter( + context: TaskAttemptContext): PathOutputCommitter = { + + logInfo(s"Setting up committer for path $destination") + committer = PathOutputCommitterFactory.createCommitter(destPath, context) + + // Special feature to force out the FileOutputCommitter, so as to guarantee + // that the binding is working properly. + val rejectFileOutput = context.getConfiguration + .getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL) + if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) { + // the output format returned a file output format committer, which + // is exactly what we do not want. So switch back to the factory. + val factory = PathOutputCommitterFactory.getCommitterFactory( + destPath, + context.getConfiguration) + logInfo(s"Using committer factory $factory") + committer = factory.createOutputCommitter(destPath, context) + } + + logInfo(s"Using committer ${committer.getClass}") + logInfo(s"Committer details: $committer") + if (committer.isInstanceOf[FileOutputCommitter]) { + require(!rejectFileOutput, + s"Committer created is the FileOutputCommitter $committer") + + if (committer.isCommitJobRepeatable(context)) { + // If FileOutputCommitter says its job commit is repeatable, it means + // it is using the v2 algorithm, which is not safe for task commit + // failures. Warn + logWarning(s"Committer $committer may not be tolerant of task commit failures") + } + } + committer + } + + /** + * Create a temporary file for a task. + * + * @param taskContext task context + * @param dir optional subdirectory + * @param ext file extension + * @return a path as a string + */ + override def newTaskTempFile( + taskContext: TaskAttemptContext, + dir: Option[String], + ext: String): String = { + + val workDir = committer.getWorkPath + val parent = dir.map(d => new Path(workDir, d)).getOrElse(workDir) + val file = new Path(parent, buildFilename(taskContext, ext)) + logInfo(s"Creating task file $file for dir $dir and ext $ext") + file.toString + } + + /** + * Absolute files are still renamed into place with a warning. + * + * @param taskContext task + * @param absoluteDir destination dir + * @param ext extension + * @return an absolute path + */ + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, --- End diff -- double indent... you get the idea.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org