[GitHub] [spark] advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write

2019-09-28 Thread GitBox
advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329336071
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
 ##
 @@ -156,4 +189,66 @@ class PartitionedWriteSuite extends QueryTest with 
SharedSparkSession {
   }
 }
   }
+
+  test("Output path should be a staging output dir, whose last level path name 
is jobId," +
+" when dynamicPartitionOverwrite is enabled") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+  withTable("t") {
+withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+  classOf[DetectCorrectOutputPathFileCommitProtocol].getName) {
+  Seq((1, 2)).toDF("a", "b")
+.write
+.partitionBy("b")
+.mode("overwrite")
+.saveAsTable("t")
+}
+  }
+}
+  }
+
+  test("Concurrent write to the same table with different partitions should be 
possible") {
+withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+  withTable("t") {
+val sem = new Semaphore(0)
+Seq((1, 2)).toDF("a", "b")
+  .write
+  .partitionBy("b")
+  .mode("overwrite")
+  .saveAsTable("t")
+
+val df1 = spark.range(0, 10).map(x => (x, 1)).toDF("a", "b")
+val df2 = spark.range(0, 10).map(x => (x, 2)).toDF("a", "b")
+val dfs = Seq(df1, df2)
+
+var throwable: Option[Throwable] = None
+for (i <- 0 until 2) {
+  new Thread {
+override def run(): Unit = {
+  try {
+dfs(i)
+  .write
+  .mode("overwrite")
+  .insertInto("t")
+  } catch {
+case t: Throwable =>
+  throwable = Some(t)
+  } finally {
+sem.release()
+  }
+}
+  }.start()
+}
+// make sure writing table in two threads are executed.
+sem.acquire(2)
+throwable.foreach { t => throw improveStackTrace(t) }
+checkAnswer(spark.sql("select a, b from t where b = 1"), df1)
+checkAnswer(spark.sql("select a, b from t where b = 2"), df2)
+  }
+}
 
 Review comment:
   Ah, that's a limitation of `DataFrameWriter`, we may need to extend 
`DataFrameWriter` to support that. 
   
   But currently, i think we can simply use the SQL syntax since we can use 
`spark.sql` and get the same behaviour.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write

2019-09-28 Thread GitBox
advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329335138
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/FileSourceWriteDesc.scala
 ##
 @@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.io
+
+/**
+ * A class to describe the properties for file source write operation.
+ *
+ * @param isInsertIntoHadoopFsRelation whether is a InsertIntoHadoopFsRelation 
operation
+ * @param escapedStaticPartitionKVs static partition key and value pairs, 
which have been escaped
+ */
+case class FileSourceWriteDesc(
 
 Review comment:
   Since this might be used/subclassed by user, we may just use a normal class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write

2019-09-28 Thread GitBox
advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329335652
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ##
 @@ -103,103 +111,131 @@ case class InsertIntoHadoopFsRelationCommand(
 val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == 
SaveMode.Overwrite &&
   staticPartitions.size < partitionColumns.length
 
+val appId = SparkEnv.get.conf.getAppId
+val jobId = java.util.UUID.randomUUID().toString
+
+val escapedStaticPartitionKVs = partitionColumns
+  .filter(c => staticPartitions.contains(c.name))
+  .map { attr =>
+val escapedKey = ExternalCatalogUtils.escapePathName(attr.name)
+val escapedValue = 
ExternalCatalogUtils.escapePathName(staticPartitions.get(attr.name).get)
+(escapedKey, escapedValue)
+  }
+
 val committer = FileCommitProtocol.instantiate(
   sparkSession.sessionState.conf.fileCommitProtocolClass,
-  jobId = java.util.UUID.randomUUID().toString,
+  jobId = jobId,
   outputPath = outputPath.toString,
-  dynamicPartitionOverwrite = dynamicPartitionOverwrite)
+  dynamicPartitionOverwrite = dynamicPartitionOverwrite,
+  fileSourceWriteDesc = Some(FileSourceWriteDesc(true, 
escapedStaticPartitionKVs)))
 
-val doInsertion = if (mode == SaveMode.Append) {
-  true
-} else {
-  val pathExists = fs.exists(qualifiedOutputPath)
-  (mode, pathExists) match {
-case (SaveMode.ErrorIfExists, true) =>
-  throw new AnalysisException(s"path $qualifiedOutputPath already 
exists.")
-case (SaveMode.Overwrite, true) =>
-  if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
-false
-  } else if (dynamicPartitionOverwrite) {
-// For dynamic partition overwrite, do not delete partition 
directories ahead.
-true
-  } else {
-deleteMatchingPartitions(fs, qualifiedOutputPath, 
customPartitionLocations, committer)
+try {
+  val doInsertion = if (mode == SaveMode.Append) {
 
 Review comment:
   Can this modification be simplified?
   
   We keep the `val doInsertion = ...` intact, issues `detectConflict` when 
doInsertion is true.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write

2019-09-28 Thread GitBox
advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329335511
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ##
 @@ -91,7 +96,62 @@ class HadoopMapReduceCommitProtocol(
*/
   private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
+  /**
+   * Whether is a InsertIntoHadoopFsRelation operation, the default is false.
+   */
+  private def isInsertIntoHadoopFsRelation =
+fileSourceWriteDesc.map(_.isInsertIntoHadoopFsRelation).getOrElse(false)
+
+  /**
+   * Get escaped static partition key and value pairs, the default is empty.
+   */
+  private def escapedStaticPartitionKVs =
+fileSourceWriteDesc.map(_.escapedStaticPartitionKVs).getOrElse(Seq.empty)
+
+  /**
+   * The staging root directory for InsertIntoHadoopFsRelation operation.
+   */
+  @transient private var insertStagingDir: Path = null
+
+  /**
+   * The staging output path for InsertIntoHadoopFsRelation operation.
+   */
+  @transient private var stagingOutputPath: Path = null
+
+  /**
+   * Get the desired output path for the job. The output will be [[path]] when 
current operation
+   * is not a InsertIntoHadoopFsRelation operation. Otherwise, we choose a sub 
path composed of
+   * [[escapedStaticPartitionKVs]] under [[insertStagingDir]] over [[path]] to 
mark this operation
+   * and we can detect whether there is a operation conflict with current by 
checking the existence
+   * of relative output path.
+   *
+   * @return Path the desired output path.
+   */
+  protected def getOutputPath(context: TaskAttemptContext): Path = {
+if (isInsertIntoHadoopFsRelation) {
+  val insertStagingPath = ".spark-staging-" + 
escapedStaticPartitionKVs.size
+  insertStagingDir = new Path(path, insertStagingPath)
+  val appId = SparkEnv.get.conf.getAppId
+  val outputPath = new Path(path, Array(insertStagingPath,
+getEscapedStaticPartitionPath(escapedStaticPartitionKVs), appId, jobId)
+.mkString(File.separator))
+  
insertStagingDir.getFileSystem(context.getConfiguration).makeQualified(outputPath)
+  outputPath
+} else {
+  new Path(path)
+}
+  }
+
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = 
{
+if (isInsertIntoHadoopFsRelation) {
+  stagingOutputPath = getOutputPath(context)
+  context.getConfiguration.set(FileOutputFormat.OUTDIR, 
stagingOutputPath.toString)
+  logWarning("Set file output committer algorithm version to 2 
implicitly," +
+" for that the task output would be committed to staging output path 
firstly," +
+" which is equivalent to algorithm 1.")
 
 Review comment:
   15843 is a lot, however, it would be not that much inside one spark 
application.
   One way to solve this, is to use an object level counter to only log the 
first warning log(or logs).
   But I am not sure if that's worth it. Also, the head of logs may get rotated 
and discarded...
   
   Or use logDebug is fine, but normally user won't set log level to DEBUG.
   
   I am not sure which one is better.  It's up to you then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] advancedxy commented on a change in pull request #25863: [SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate result and support concurrent file source write

2019-09-28 Thread GitBox
advancedxy commented on a change in pull request #25863: 
[SPARK-28945][SPARK-29037][CORE][SQL] Fix the issue that spark gives duplicate 
result and support concurrent file source write operations write to different 
partitions in the same table.
URL: https://github.com/apache/spark/pull/25863#discussion_r329335222
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
 ##
 @@ -169,4 +171,84 @@ object FileCommitProtocol extends Logging {
 ctor.newInstance(jobId, outputPath)
 }
   }
+
+  /**
+   * Instantiates a FileCommitProtocol with file source write description.
+   */
+  def instantiate(
+  className: String,
+  jobId: String,
+  outputPath: String,
+  dynamicPartitionOverwrite: Boolean,
+  fileSourceWriteDesc: Option[FileSourceWriteDesc]): FileCommitProtocol = {
+
+logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" 
+
+  s" dynamic=$dynamicPartitionOverwrite; fileSourceWriteDesc= 
$fileSourceWriteDesc")
+val clazz = Utils.classForName[FileCommitProtocol](className)
+// First try the constructor with arguments (jobId: String, outputPath: 
String,
+// dynamicPartitionOverwrite: Boolean, fileSourceWriteDesc: 
Option[FileSourceWriteDesc]).
+// If that doesn't exist, try to invoke 
`FileCommitProtocol.instance(className,
+// JobId, outputPath, dynamicPartitionOverwrite)`.
+try {
+  val ctor = clazz.getDeclaredConstructor(classOf[String], 
classOf[String], classOf[Boolean],
+classOf[Option[FileSourceWriteDesc]])
+  logDebug("Using (String, String, Boolean, FileSourceWriteDesc) 
constructor")
+  ctor.newInstance(jobId, outputPath, 
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean],
+fileSourceWriteDesc)
+} catch {
+  case _: NoSuchMethodException =>
+logDebug("Falling back to invoke instance(className, JobId, 
outputPath," +
+  " dynamicPartitionOverwrite)")
+instantiate(className, jobId, outputPath, dynamicPartitionOverwrite)
+}
+  }
 
 Review comment:
   Kind of, but I think we can also add `dynamicPartitionOverwrite` to 
`fileSourceWriteDesc`.
   
   For user defined `FileCommitProtocol` class, We can extract 
`fileSourceWriteDesc` and pass it to the old instantiate method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org