advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write operations write to different partitions in the same table. URL: https://github.com/apache/spark/pull/25863#discussion_r329335511
########## File path: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala ########## @@ -91,7 +96,62 @@ class HadoopMapReduceCommitProtocol( */ private def stagingDir = new Path(path, ".spark-staging-" + jobId) + /** + * Whether is a InsertIntoHadoopFsRelation operation, the default is false. + */ + private def isInsertIntoHadoopFsRelation = + fileSourceWriteDesc.map(_.isInsertIntoHadoopFsRelation).getOrElse(false) + + /** + * Get escaped static partition key and value pairs, the default is empty. + */ + private def escapedStaticPartitionKVs = + fileSourceWriteDesc.map(_.escapedStaticPartitionKVs).getOrElse(Seq.empty) + + /** + * The staging root directory for InsertIntoHadoopFsRelation operation. + */ + @transient private var insertStagingDir: Path = null + + /** + * The staging output path for InsertIntoHadoopFsRelation operation. + */ + @transient private var stagingOutputPath: Path = null + + /** + * Get the desired output path for the job. The output will be [[path]] when current operation + * is not a InsertIntoHadoopFsRelation operation. Otherwise, we choose a sub path composed of + * [[escapedStaticPartitionKVs]] under [[insertStagingDir]] over [[path]] to mark this operation + * and we can detect whether there is a operation conflict with current by checking the existence + * of relative output path. + * + * @return Path the desired output path. + */ + protected def getOutputPath(context: TaskAttemptContext): Path = { + if (isInsertIntoHadoopFsRelation) { + val insertStagingPath = ".spark-staging-" + escapedStaticPartitionKVs.size + insertStagingDir = new Path(path, insertStagingPath) + val appId = SparkEnv.get.conf.getAppId + val outputPath = new Path(path, Array(insertStagingPath, + getEscapedStaticPartitionPath(escapedStaticPartitionKVs), appId, jobId) + .mkString(File.separator)) + insertStagingDir.getFileSystem(context.getConfiguration).makeQualified(outputPath) + outputPath + } else { + new Path(path) + } + } + protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { + if (isInsertIntoHadoopFsRelation) { + stagingOutputPath = getOutputPath(context) + context.getConfiguration.set(FileOutputFormat.OUTDIR, stagingOutputPath.toString) + logWarning("Set file output committer algorithm version to 2 implicitly," + + " for that the task output would be committed to staging output path firstly," + + " which is equivalent to algorithm 1.") Review comment: 15843 is a lot, however, it would be not that much inside one spark application. One way to solve this, is to use an object level counter to only log the first warning log(or logs). But I am not sure if that's worth it. Also, the head of logs may get rotated and discarded... Or use logDebug is fine, but normally user won't set log level to DEBUG. I am not sure which one is better. It's up to you then. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org