Repository: spark
Updated Branches:
  refs/heads/master 6c9e5ac9d -> 950e7374a


[SPARK-25913][SQL] Extend UnaryExecNode by unary SparkPlan nodes

## What changes were proposed in this pull request?

In the PR, I propose to extend `UnaryExecNode` instead of `SparkPlan` by unary 
nodes.

Closes #22925 from MaxGekk/unary-exec-node.

Authored-by: Maxim Gekk <max.g...@gmail.com>
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/950e7374
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/950e7374
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/950e7374

Branch: refs/heads/master
Commit: 950e7374a89cf45742a442afc08a74b6b4a7aa66
Parents: 6c9e5ac
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Sun Nov 4 17:41:42 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sun Nov 4 17:41:42 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/command/commands.scala   |  6 ++----
 .../datasources/v2/WriteToDataSourceV2Exec.scala        |  6 +++---
 .../spark/sql/execution/python/EvalPythonExec.scala     |  6 ++----
 .../streaming/continuous/ContinuousCoalesceExec.scala   | 12 +++---------
 .../continuous/WriteToContinuousDataSourceExec.scala    |  6 +++---
 5 files changed, 13 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 2cc0e38..ab40936 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
+import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.debug._
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata}
@@ -95,7 +95,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends 
LeafExecNode {
  * @param child the physical plan child ran by the `DataWritingCommand`.
  */
 case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
-  extends SparkPlan {
+  extends UnaryExecNode {
 
   override lazy val metrics: Map[String, SQLMetric] = cmd.metrics
 
@@ -106,8 +106,6 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, 
child: SparkPlan)
     rows.map(converter(_).asInstanceOf[InternalRow])
   }
 
-  override def children: Seq[SparkPlan] = child :: Nil
-
   override def output: Seq[Attribute] = cmd.output
 
   override def nodeName: String = "Execute " + cmd.nodeName

http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index c3f7b69..9a1fe1e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.util.Utils
 
@@ -45,9 +45,9 @@ case class WriteToDataSourceV2(writeSupport: 
BatchWriteSupport, query: LogicalPl
  * The physical plan for writing data into data source v2.
  */
 case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: 
SparkPlan)
-  extends SparkPlan {
+  extends UnaryExecNode {
 
-  override def children: Seq[SparkPlan] = Seq(query)
+  override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index 942a6db..67dcdd3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -26,7 +26,7 @@ import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.util.Utils
 
@@ -58,9 +58,7 @@ import org.apache.spark.util.Utils
  * RowQueue ALWAYS happened after pushing into it.
  */
 abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], 
child: SparkPlan)
-  extends SparkPlan {
-
-  def children: Seq[SparkPlan] = child :: Nil
+  extends UnaryExecNode {
 
   override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
index 5f60343..4c62189 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala
@@ -17,26 +17,20 @@
 
 package org.apache.spark.sql.execution.streaming.continuous
 
-import java.util.UUID
-
-import org.apache.spark.{HashPartitioner, SparkEnv}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
-import org.apache.spark.sql.execution.SparkPlan
-import 
org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition,
 ContinuousShuffleReadRDD}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 
 /**
  * Physical plan for coalescing a continuous processing plan.
  *
  * Currently, only coalesces to a single partition are supported. 
`numPartitions` must be 1.
  */
-case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) 
extends SparkPlan {
+case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) 
extends UnaryExecNode {
   override def output: Seq[Attribute] = child.output
 
-  override def children: Seq[SparkPlan] = child :: Nil
-
   override def outputPartitioning: Partitioning = SinglePartition
 
   override def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
index a797ac1..2178466 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 
@@ -32,8 +32,8 @@ import 
org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
  * The physical plan for writing data into a continuous processing 
[[StreamingWriteSupport]].
  */
 case class WriteToContinuousDataSourceExec(writeSupport: 
StreamingWriteSupport, query: SparkPlan)
-    extends SparkPlan with Logging {
-  override def children: Seq[SparkPlan] = Seq(query)
+    extends UnaryExecNode with Logging {
+  override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to