[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15814 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87315912 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala --- @@ -104,4 +113,42 @@ case class InsertIntoHadoopFsRelationCommand( Seq.empty[Row] } + + /** + * Deletes all partition files that match the specified static prefix. Partitions with custom + * locations are also cleared based on the custom locations map given to this class. + */ + private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = { +val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) { + "/" + partitionColumns.flatMap { p => +staticPartitionKeys.get(p.name) match { + case Some(value) => +Some( + PartitioningUtils.escapePathName(p.name) + "=" + + PartitioningUtils.escapePathName(value)) + case None => +None +} + }.mkString("/") +} else { + "" +} +// first clear the path determined by the static partition keys (e.g. /table/foo=1) +val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix) +if (fs.exists(staticPrefixPath) && !fs.delete(staticPrefixPath, true /* recursively */)) { --- End diff -- OK for me if we document it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87315800 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -87,25 +120,40 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) +val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]).reduce(_ ++ _) +logDebug(s"Committing files staged for absolute locations $filesToMove") +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +for ((src, dst) <- filesToMove) { + fs.rename(new Path(src), new Path(dst)) +} +fs.delete(absPathStagingDir, true) } override def abortJob(jobContext: JobContext): Unit = { committer.abortJob(jobContext, JobStatus.State.FAILED) +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +fs.delete(absPathStagingDir, true) } override def setupTask(taskContext: TaskAttemptContext): Unit = { committer = setupCommitter(taskContext) committer.setupTask(taskContext) +addedAbsPathFiles = mutable.Map[String, String]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) -EmptyTaskCommitMessage +new TaskCommitMessage(addedAbsPathFiles.toMap) --- End diff -- I'd prefer renaming in task commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87311124 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala --- @@ -186,4 +186,106 @@ class PartitionProviderCompatibilitySuite } } } + + test("insert into and overwrite new datasource tables with partial specs and custom locs") { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87138232 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -87,25 +120,40 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) +val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]).reduce(_ ++ _) +logDebug(s"Committing files staged for absolute locations $filesToMove") +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +for ((src, dst) <- filesToMove) { + fs.rename(new Path(src), new Path(dst)) +} +fs.delete(absPathStagingDir, true) } override def abortJob(jobContext: JobContext): Unit = { committer.abortJob(jobContext, JobStatus.State.FAILED) +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +fs.delete(absPathStagingDir, true) } override def setupTask(taskContext: TaskAttemptContext): Unit = { committer = setupCommitter(taskContext) committer.setupTask(taskContext) +addedAbsPathFiles = mutable.Map[String, String]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) -EmptyTaskCommitMessage +new TaskCommitMessage(addedAbsPathFiles.toMap) --- End diff -- Yea it can go either way. Unclear which one is better. Renaming on job commit gives higher chance of corrupting data, whereas renaming in task commit is slightly more performant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87138139 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -87,25 +120,40 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = { committer.commitJob(jobContext) +val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]]).reduce(_ ++ _) +logDebug(s"Committing files staged for absolute locations $filesToMove") +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +for ((src, dst) <- filesToMove) { + fs.rename(new Path(src), new Path(dst)) +} +fs.delete(absPathStagingDir, true) } override def abortJob(jobContext: JobContext): Unit = { committer.abortJob(jobContext, JobStatus.State.FAILED) +val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration) +fs.delete(absPathStagingDir, true) } override def setupTask(taskContext: TaskAttemptContext): Unit = { committer = setupCommitter(taskContext) committer.setupTask(taskContext) +addedAbsPathFiles = mutable.Map[String, String]() } override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) -EmptyTaskCommitMessage +new TaskCommitMessage(addedAbsPathFiles.toMap) --- End diff -- Why don't we just rename temp files to dest files in commitTask? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87135950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -226,6 +238,34 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { insertCmd } + + /** + * Given a set of input partitions, returns those that have locations that differ from the + * Hive default (e.g. /k1=v1/k2=v2). These partitions were manually assigned locations by + * the user. + * + * @return a mapping from partition specs to their custom locations + */ + private def getCustomPartitionLocations( + spark: SparkSession, + table: CatalogTable, + basePath: Path, + partitions: Seq[CatalogTablePartition]): Map[TablePartitionSpec, String] = { +val hadoopConf = spark.sessionState.newHadoopConf +val fs = basePath.getFileSystem(hadoopConf) +val qualifiedBasePath = basePath.makeQualified(fs.getUri, fs.getWorkingDirectory) +partitions.flatMap { p => + val defaultLocation = qualifiedBasePath.suffix( +"/" + PartitioningUtils.getPathFragment(p.spec, table.partitionSchema)).toString + val catalogLocation = new Path(p.storage.locationUri.get).makeQualified( +fs.getUri, fs.getWorkingDirectory).toString + if (catalogLocation != defaultLocation) { --- End diff -- Why we distinguish partition locations that equal to default location? Partitions always have locations(custom specified or generated by default), do we really need to care about who set it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87109861 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -86,6 +86,16 @@ abstract class FileCommitProtocol { def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + */ + def newTaskTempFileAbsPath( --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87111922 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -42,17 +44,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) /** OutputCommitter from Hadoop is not serializable so marking it transient. */ @transient private var committer: OutputCommitter = _ + /** + * Tracks files staged by this task for absolute output paths. These outputs are not managed by + * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. + */ + @transient private var addedAbsPathFiles: mutable.Map[String, String] = null + + private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87112095 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -350,13 +350,15 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { * Options for writing new data into a table. * * @param enabled whether to overwrite existing data in the table. - * @param specificPartition only data in the specified partition will be overwritten. + * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions --- End diff -- Yep it is in the next sentence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87113460 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -182,41 +182,53 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { "Cannot overwrite a path that is also being read from.") } - val overwritingSinglePartition = -overwrite.specificPartition.isDefined && + val partitionSchema = query.resolve( +t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) + val partitionsTrackedByCatalog = t.sparkSession.sessionState.conf.manageFilesourcePartitions && +l.catalogTable.isDefined && l.catalogTable.get.partitionColumnNames.nonEmpty && l.catalogTable.get.tracksPartitionsInCatalog - val effectiveOutputPath = if (overwritingSinglePartition) { -val partition = t.sparkSession.sessionState.catalog.getPartition( - l.catalogTable.get.identifier, overwrite.specificPartition.get) -new Path(partition.storage.locationUri.get) - } else { -outputPath - } - - val effectivePartitionSchema = if (overwritingSinglePartition) { -Nil - } else { -query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver) + var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil + var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty + + // When partitions are tracked by the catalog, compute all custom partition locations that + // may be relevant to the insertion job. + if (partitionsTrackedByCatalog) { +val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions( --- End diff -- You also need the set of matching partitions (including those with default locations) in order to determine which ones to delete at the end of an overwrite call. This makes the optimization quite messy, so I'd rather not push it to the catalog for now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87129853 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -86,6 +86,16 @@ abstract class FileCommitProtocol { def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + */ + def newTaskTempFileAbsPath( --- End diff -- I thought about combining it but I think the method semantics become too subtle then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87112188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -418,6 +418,8 @@ case class DataSource( val plan = InsertIntoHadoopFsRelationCommand( outputPath, +Map.empty, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87111758 --- Diff: core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala --- @@ -42,17 +44,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) /** OutputCommitter from Hadoop is not serializable so marking it transient. */ @transient private var committer: OutputCommitter = _ + /** + * Tracks files staged by this task for absolute output paths. These outputs are not managed by + * the Hadoop OutputCommitter, so we must move these to their final locations on job commit. + */ + @transient private var addedAbsPathFiles: mutable.Map[String, String] = null --- End diff -- They are files. We need to track the unique output location of each file here in order to know where to place it. We could use directories, but they would end up with one file each anyways. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87112037 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala --- @@ -178,18 +178,13 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx) } val overwrite = ctx.OVERWRITE != null -val overwritePartition = - if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) { -Some(partitionKeys.map(t => (t._1, t._2.get))) - } else { -None - } +val staticPartitionKeys = partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get)) --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15814: [SPARK-18185] Fix all forms of INSERT / OVERWRITE...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/15814#discussion_r87115876 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -86,6 +86,16 @@ abstract class FileCommitProtocol { def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String /** + * Similar to newTaskTempFile(), but allows files to committed to an absolute output location. + * Depending on the implementation, there may be weaker guarantees around adding files this way. + */ + def newTaskTempFileAbsPath( --- End diff -- Can you unify this and `newTaskTempFile`? If we treat the default partition location like custom location. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org