spark git commit: [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
Repository: spark Updated Branches: refs/heads/branch-2.3 740606eb8 -> fa552c3c1 [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter ```Scala val udf1 = udf({(x: Int, y: Int) => x + y}) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1($"a", udf1($"a", lit(10 df.cache() df.write.saveAsTable("t") ``` Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent. Added a test. Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869 Author: Xiao Li Closes #21821 from gatorsmile/testMaster22. (cherry picked from commit d2e7deb59f641e93778b763d5396f73d38f9a785) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa552c3c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa552c3c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa552c3c Branch: refs/heads/branch-2.3 Commit: fa552c3c1102404fe98c72a5b83cffbc5ba41df3 Parents: 740606e Author: Xiao Li Authored: Wed Jul 25 17:22:37 2018 -0700 Committer: Xiao Li Committed: Wed Jul 25 17:24:32 2018 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 10 +++-- .../spark/sql/execution/command/ddl.scala | 7 ++-- .../scala/org/apache/spark/sql/UDFSuite.scala | 42 +++- 3 files changed, 51 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa552c3c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ed7a910..6c9fb52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) if (writer.isPresent) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) + WriteToDataSourceV2(writer.get(), df.planWithBarrier) } } @@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), -options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) +options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier) } } @@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], -query = df.logicalPlan, +query = df.planWithBarrier, overwrite = mode == SaveMode.Overwrite, ifPartitionNotExists = false) } @@ -455,7 +455,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec) -runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) +runCommand(df.sparkSession, "saveAsTable") { + CreateTable(tableDesc, mode, Some(df.planWithBarrier)) +} } /** http://git-wip-us.apache.org/repos/asf/spark/blob/fa552c3c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 0f4831b..28313f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.
spark git commit: [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter
Repository: spark Updated Branches: refs/heads/master 17f469bc8 -> d2e7deb59 [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter ## What changes were proposed in this pull request? ```Scala val udf1 = udf({(x: Int, y: Int) => x + y}) val df = spark.range(0, 3).toDF("a") .withColumn("b", udf1($"a", udf1($"a", lit(10 df.cache() df.write.saveAsTable("t") ``` Cache is not being used because the plans do not match with the cached plan. This is a regression caused by the changes we made in AnalysisBarrier, since not all the Analyzer rules are idempotent. ## How was this patch tested? Added a test. Also found a bug in the DSV1 write path. This is not a regression. Thus, opened a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869 Author: Xiao Li Closes #21821 from gatorsmile/testMaster22. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2e7deb5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2e7deb5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2e7deb5 Branch: refs/heads/master Commit: d2e7deb59f641e93778b763d5396f73d38f9a785 Parents: 17f469b Author: Xiao Li Authored: Wed Jul 25 17:22:37 2018 -0700 Committer: Xiao Li Committed: Wed Jul 25 17:22:37 2018 -0700 -- .../org/apache/spark/sql/DataFrameWriter.scala | 10 +++-- .../spark/sql/execution/command/ddl.scala | 7 ++-- .../scala/org/apache/spark/sql/UDFSuite.scala | 42 +++- 3 files changed, 51 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b9fa43f..39c0e10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, options) if (writer.isPresent) { runCommand(df.sparkSession, "save") { - WriteToDataSourceV2(writer.get(), df.logicalPlan) + WriteToDataSourceV2(writer.get(), df.planWithBarrier) } } @@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { sparkSession = df.sparkSession, className = source, partitionColumns = partitioningColumns.getOrElse(Nil), -options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) +options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier) } } @@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], -query = df.logicalPlan, +query = df.planWithBarrier, overwrite = mode == SaveMode.Overwrite, ifPartitionNotExists = false) } @@ -459,7 +459,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { partitionColumnNames = partitioningColumns.getOrElse(Nil), bucketSpec = getBucketSpec) -runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) +runCommand(df.sparkSession, "saveAsTable") { + CreateTable(tableDesc, mode, Some(df.planWithBarrier)) +} } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 04bf8c6..c7f7e4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} +import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Attribu