spark git commit: [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

2018-07-25 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 740606eb8 -> fa552c3c1


[SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

```Scala
  val udf1 = udf({(x: Int, y: Int) => x + y})
  val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10
  df.cache()
  df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. 
This is a regression caused by the changes we made in AnalysisBarrier, since 
not all the Analyzer rules are idempotent.

Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened 
a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li 

Closes #21821 from gatorsmile/testMaster22.

(cherry picked from commit d2e7deb59f641e93778b763d5396f73d38f9a785)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa552c3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa552c3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa552c3c

Branch: refs/heads/branch-2.3
Commit: fa552c3c1102404fe98c72a5b83cffbc5ba41df3
Parents: 740606e
Author: Xiao Li 
Authored: Wed Jul 25 17:22:37 2018 -0700
Committer: Xiao Li 
Committed: Wed Jul 25 17:24:32 2018 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 10 +++--
 .../spark/sql/execution/command/ddl.scala   |  7 ++--
 .../scala/org/apache/spark/sql/UDFSuite.scala   | 42 +++-
 3 files changed, 51 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa552c3c/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ed7a910..6c9fb52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
   if (writer.isPresent) {
 runCommand(df.sparkSession, "save") {
-  WriteToDataSourceV2(writer.get(), df.logicalPlan)
+  WriteToDataSourceV2(writer.get(), df.planWithBarrier)
 }
   }
 
@@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 sparkSession = df.sparkSession,
 className = source,
 partitionColumns = partitioningColumns.getOrElse(Nil),
-options = extraOptions.toMap).planForWriting(mode, 
AnalysisBarrier(df.logicalPlan))
+options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier)
 }
   }
 
@@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   InsertIntoTable(
 table = UnresolvedRelation(tableIdent),
 partition = Map.empty[String, Option[String]],
-query = df.logicalPlan,
+query = df.planWithBarrier,
 overwrite = mode == SaveMode.Overwrite,
 ifPartitionNotExists = false)
 }
@@ -455,7 +455,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   partitionColumnNames = partitioningColumns.getOrElse(Nil),
   bucketSpec = getBucketSpec)
 
-runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, 
Some(df.logicalPlan)))
+runCommand(df.sparkSession, "saveAsTable") {
+  CreateTable(tableDesc, mode, Some(df.planWithBarrier))
+}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/fa552c3c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 0f4831b..28313f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, 
NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.

spark git commit: [SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

2018-07-25 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 17f469bc8 -> d2e7deb59


[SPARK-24867][SQL] Add AnalysisBarrier to DataFrameWriter

## What changes were proposed in this pull request?
```Scala
  val udf1 = udf({(x: Int, y: Int) => x + y})
  val df = spark.range(0, 3).toDF("a")
.withColumn("b", udf1($"a", udf1($"a", lit(10
  df.cache()
  df.write.saveAsTable("t")
```
Cache is not being used because the plans do not match with the cached plan. 
This is a regression caused by the changes we made in AnalysisBarrier, since 
not all the Analyzer rules are idempotent.

## How was this patch tested?
Added a test.

Also found a bug in the DSV1 write path. This is not a regression. Thus, opened 
a separate JIRA https://issues.apache.org/jira/browse/SPARK-24869

Author: Xiao Li 

Closes #21821 from gatorsmile/testMaster22.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2e7deb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2e7deb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2e7deb5

Branch: refs/heads/master
Commit: d2e7deb59f641e93778b763d5396f73d38f9a785
Parents: 17f469b
Author: Xiao Li 
Authored: Wed Jul 25 17:22:37 2018 -0700
Committer: Xiao Li 
Committed: Wed Jul 25 17:22:37 2018 -0700

--
 .../org/apache/spark/sql/DataFrameWriter.scala  | 10 +++--
 .../spark/sql/execution/command/ddl.scala   |  7 ++--
 .../scala/org/apache/spark/sql/UDFSuite.scala   | 42 +++-
 3 files changed, 51 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b9fa43f..39c0e10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -254,7 +254,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   val writer = ws.createWriter(jobId, df.logicalPlan.schema, mode, 
options)
   if (writer.isPresent) {
 runCommand(df.sparkSession, "save") {
-  WriteToDataSourceV2(writer.get(), df.logicalPlan)
+  WriteToDataSourceV2(writer.get(), df.planWithBarrier)
 }
   }
 
@@ -275,7 +275,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 sparkSession = df.sparkSession,
 className = source,
 partitionColumns = partitioningColumns.getOrElse(Nil),
-options = extraOptions.toMap).planForWriting(mode, 
AnalysisBarrier(df.logicalPlan))
+options = extraOptions.toMap).planForWriting(mode, df.planWithBarrier)
 }
   }
 
@@ -323,7 +323,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   InsertIntoTable(
 table = UnresolvedRelation(tableIdent),
 partition = Map.empty[String, Option[String]],
-query = df.logicalPlan,
+query = df.planWithBarrier,
 overwrite = mode == SaveMode.Overwrite,
 ifPartitionNotExists = false)
 }
@@ -459,7 +459,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
   partitionColumnNames = partitioningColumns.getOrElse(Nil),
   bucketSpec = getBucketSpec)
 
-runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, 
Some(df.logicalPlan)))
+runCommand(df.sparkSession, "saveAsTable") {
+  CreateTable(tableDesc, mode, Some(df.planWithBarrier))
+}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d2e7deb5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 04bf8c6..c7f7e4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
+import org.apache.spark.sql.catalyst.analysis.{EliminateBarriers, 
NoSuchTableException, Resolver}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
Attribu