[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r217255531
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
 return Seq.empty
   }
 
-  sparkSession.sessionState.executePlan(
-InsertIntoTable(
-  UnresolvedRelation(tableIdentifier),
-  Map(),
-  query,
-  overwrite = false,
-  ifPartitionNotExists = false)).toRdd
+  InsertIntoHiveTable(
--- End diff --

Ok. I see. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-09-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r217255313
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
 return Seq.empty
   }
 
-  sparkSession.sessionState.executePlan(
-InsertIntoTable(
-  UnresolvedRelation(tableIdentifier),
-  Map(),
-  query,
-  overwrite = false,
-  ifPartitionNotExists = false)).toRdd
+  InsertIntoHiveTable(
--- End diff --

ah good catch! I don't think we can revert here, as we need to execute the 
physical plan given as a parameter.

I think we should improve the hive table conversion optimizer rule, and 
handle CTAS as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r217254812
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
 return Seq.empty
   }
 
-  sparkSession.sessionState.executePlan(
-InsertIntoTable(
-  UnresolvedRelation(tableIdentifier),
-  Map(),
-  query,
-  overwrite = false,
-  ifPartitionNotExists = false)).toRdd
+  InsertIntoHiveTable(
--- End diff --

Will it affect web UI SQL tab?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r217254430
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -56,34 +57,36 @@ case class CreateHiveTableAsSelectCommand(
 return Seq.empty
   }
 
-  sparkSession.sessionState.executePlan(
-InsertIntoTable(
-  UnresolvedRelation(tableIdentifier),
-  Map(),
-  query,
-  overwrite = false,
-  ifPartitionNotExists = false)).toRdd
+  InsertIntoHiveTable(
--- End diff --

@cloud-fan this change from `InsertIntoTable` to `InsertIntoHiveTable` 
introduces a regression 
[SPARK-25271](https://issues.apache.org/jira/browse/SPARK-25271), I'd like to 
revert it back to use `InsertIntoTable`, WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20521


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r167140801
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 ---
@@ -128,32 +128,6 @@ class HiveExplainSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
   "src")
   }
 
-  test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed 
plan") {
--- End diff --

This is kinda a "bad" test. The bug was we optimize the CTAS input query 
twice, but here we are testing the if the EXPLAIN result of CTAS only contains 
analyzed query, which is specific to how we fix that bug at that time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166517452
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -72,13 +72,14 @@ case class CreateHiveTableAsSelectCommand(
 tableDesc.copy(schema = query.schema), ignoreIfExists = false)
 
   try {
-sparkSession.sessionState.executePlan(
-  InsertIntoTable(
-UnresolvedRelation(tableIdentifier),
-Map(),
-query,
-overwrite = true,
-ifPartitionNotExists = false)).toRdd
+InsertIntoHiveTable(
+  // Read back the metadata of the table which was created just 
now.
+  
sparkSession.sessionState.catalog.getTableMetadata(tableDesc.identifier),
+  Map.empty,
+  query,
+  overwrite = false,
--- End diff --

good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166517178
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

Because the CTAS node itself doesn't have a `QueryExecution`. It only has 
the physical plan that was produced by the planner. `QueryExecution` only 
exists in the places that drive the  analze/.../exeucte, e.g. `Dataset`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166510668
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -72,13 +72,14 @@ case class CreateHiveTableAsSelectCommand(
 tableDesc.copy(schema = query.schema), ignoreIfExists = false)
 
   try {
-sparkSession.sessionState.executePlan(
-  InsertIntoTable(
-UnresolvedRelation(tableIdentifier),
-Map(),
-query,
-overwrite = true,
-ifPartitionNotExists = false)).toRdd
+InsertIntoHiveTable(
+  // Read back the metadata of the table which was created just 
now.
+  
sparkSession.sessionState.catalog.getTableMetadata(tableDesc.identifier),
+  Map.empty,
+  query,
+  overwrite = false,
--- End diff --

previously, overwrite = true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166508678
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -438,7 +440,7 @@ case class DataSource(
* Writes the given [[LogicalPlan]] out in this [[FileFormat]].
--- End diff --

nit: the comment seems not accurate. It doesn't write the logical plan out 
in fact.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166425787
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

Ok, I think I get it now. Thanks for explaining.

Why not pass the `QueryExecution` so you have access to the resolved plan 
without copying resolution rules here? I'm just curious here, I get that this 
is intended as a quick fix for the release, so don't let my comments block you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166421002
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

`planForWritingFileFormat` returns an **unresolve** 
`InsertIntoHadoopFsRelationCommand`. The previous code runs 
`sparkSession.sessionState.executePlan` to analze/.../exeucte the unresolved 
`InsertIntoHadoopFsRelationCommand`, which convert logical plan to physical 
plan and execute it.

Now, I wanna run the given physical plan. Even though 
`sparkSession.sessionState.executePlan` can produce an exactly same physical 
plan, they are different objects and UI is not happy with it, so I choose to 
manually resolve the unresolved `InsertIntoHadoopFsRelationCommand` here, and 
explicitly ask it to run my given physical plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166414290
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

If it matches, then why is there column resolution happening?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166413015
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

> Why does the physical plan not match the command that is produced

It matches! The only problem is, they are 2 different JVM objects. The UI 
keeps the physical plan object and displays them. An alternative solution is to 
swap the new physical plan into the UI part, but that's hard to do with the 
current UI framework.

If we run 
`sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd`, we are executing the new physical plan, so no metrics will be 
reported to the passed-in physical plan and shown in the UI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166410690
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

I don't think you're answering the question that I'm asking. I understand 
why the physical plan is passed in.

Why does the physical plan not match the command that is produced, or why 
doesn't the command here match the physical plan? I don't see why executePlan 
would produce something different than what is passed in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166409729
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

The given physical plan has been registered to the UI, and we can collect 
its metrics if we execute it. However if we run 
`sparkSession.sessionState.executePlan`, we get a new physical plan which 
semantically equals to the given physical plan but not the same object. This 
new physical plan is not registered to UI so we can't show metrics correctly in 
the UI.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166407366
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

I get the point of passing the physical plan, and I think that's a good 
idea. What I don't understand is why the command doesn't match the physical 
plan that is passed in. Is that physical plan based on a different logical 
plan? I would expect that the physical plan is created once and passed into 
run, but that it was created from the logical plan that is also passed in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166403941
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

The previous code calls `sparkSession.sessionState.executePlan` to 
analze/optimize/plan/exeucte this temporary 
`InsertIntoHadoopFsRelationCommand`, which is pretty hacky because at this 
moment, we are executing CTAS and we already have the final physical plan. Here 
we manually analyze the `InsertIntoHadoopFsRelationCommand` so that we only 
reuse the physical part here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166396780
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -493,9 +510,23 @@ case class DataSource(
 dataSource.createRelation(
   sparkSession.sqlContext, mode, caseInsensitiveOptions, 
Dataset.ofRows(sparkSession, data))
   case format: FileFormat =>
-
sparkSession.sessionState.executePlan(planForWritingFileFormat(format, mode, 
data)).toRdd
+val cmd = planForWritingFileFormat(format, mode, data)
+val resolvedPartCols = cmd.partitionColumns.map { col =>
+  // The partition columns created in `planForWritingFileFormat` 
should always be
+  // `UnresolvedAttribute` with a single name part.
+  assert(col.isInstanceOf[UnresolvedAttribute])
+  val unresolved = col.asInstanceOf[UnresolvedAttribute]
+  assert(unresolved.nameParts.length == 1)
+  val name = unresolved.nameParts.head
+  outputColumns.find(a => equality(a.name, name)).getOrElse {
+throw new AnalysisException(
+  s"Unable to resolve $name given 
[${data.output.map(_.name).mkString(", ")}]")
+  }
+}
+val resolved = cmd.copy(partitionColumns = resolvedPartCols, 
outputColumns = outputColumns)
--- End diff --

Is this ad-hoc column resolution just to ensure the names have the correct 
case after it is possibly dropped by the optimizer? Why does the command need 
to report these and where are they used?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20521#discussion_r166386827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -482,9 +484,24 @@ case class DataSource(
   /**
* Writes the given [[LogicalPlan]] out to this [[DataSource]] and 
returns a [[BaseRelation]] for
* the following reading.
+   *
+   * @param mode The save mode for this writing.
+   * @param data The input query plan that produces the data to be 
written. Note that this plan
+   * is analyzed and optimized.
+   * @param outputColumns The original output columns of the input query 
plan. The optimizer may not
+   *  preserve the output column's names' case, so we 
need this parameter
+   *  instead of `data.output`.
+   * @param physicalPlan The physical plan of the input query plan. We 
should run the writing
+   * command with this physical plan instead of 
creating a new physical plan,
+   * so that the metrics can be correctly linked to 
the given physical plan and
+   * shown in the web UI.
--- End diff --

Generally I think it's hacky to analyze/optimize/plan/execute a query 
during the execution of another query. Not only CTAS, other commands like 
`CreateView`, `CacheTable` etc. also have this issue. This is a surgical fix 
for Spark 2.3, so I didn't change this part and leave it for 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20521: [SPARK-22977][SQL] fix web UI SQL tab for CTAS

2018-02-06 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/20521

[SPARK-22977][SQL] fix web UI SQL tab for CTAS

## What changes were proposed in this pull request?

This is a regression in Spark 2.3.

In Spark 2.2, we have a fragile UI support for SQL data writing commands. 
We only track the input query plan of `FileFormatWriter` and display its 
metrics. This is not ideal because we don't know who triggered the writing(can 
be table insertion, CTAS, etc.), but it's still useful to see the metrics of 
the input query.

In Spark 2.3, we introduced a new mechanism: `DataWritigCommand`, to fix 
the UI issue entirely. Now these writing commands have real children, and we 
don't need to hack into the `FileFormatWriter` for the UI. This also helps with 
`explain`, now `explain` can show the physical plan of the input query, while 
in 2.2 the physical writing plan is simply `ExecutedCommandExec` and it has no 
child.

However there is a regression in CTAS. CTAS commands don't extend 
`DataWritigCommand`, and we don't have the UI hack in `FileFormatWriter` 
anymore, so the UI for CTAS is just an empty node. See 
https://issues.apache.org/jira/browse/SPARK-22977 for more information about 
this UI issue.

To fix it, we should apply the `DataWritigCommand` mechanism to CTAS 
commands.

TODO: In the future, we should refactor this part and create some physical 
layer code pieces for data writing, and reuse them in different writing 
commands. We should have different logical nodes for different operators, even 
some of them share some same logic, e.g. CTAS, CREATE TABLE, INSERT TABLE. 
Internally we can share the same physical logic.

## How was this patch tested?

manually tested.
For data source table
https://user-images.githubusercontent.com/3182036/35874155-bdffab28-0ba6-11e8-94a8-e32e106ba069.png";>
For hive table
https://user-images.githubusercontent.com/3182036/35874161-c437e2a8-0ba6-11e8-98ed-7930f01432c5.png";>



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark UI

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20521


commit b90c8f3297d11471c6393b91f7ed5c8e52735f7f
Author: Wenchen Fan 
Date:   2018-02-06T16:59:58Z

fix web UI SQL tab for CTAS




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org