Repository: spark Updated Branches: refs/heads/master 6c9e5ac9d -> 950e7374a
[SPARK-25913][SQL] Extend UnaryExecNode by unary SparkPlan nodes ## What changes were proposed in this pull request? In the PR, I propose to extend `UnaryExecNode` instead of `SparkPlan` by unary nodes. Closes #22925 from MaxGekk/unary-exec-node. Authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/950e7374 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/950e7374 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/950e7374 Branch: refs/heads/master Commit: 950e7374a89cf45742a442afc08a74b6b4a7aa66 Parents: 6c9e5ac Author: Maxim Gekk <max.g...@gmail.com> Authored: Sun Nov 4 17:41:42 2018 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Sun Nov 4 17:41:42 2018 -0800 ---------------------------------------------------------------------- .../apache/spark/sql/execution/command/commands.scala | 6 ++---- .../datasources/v2/WriteToDataSourceV2Exec.scala | 6 +++--- .../spark/sql/execution/python/EvalPythonExec.scala | 6 ++---- .../streaming/continuous/ContinuousCoalesceExec.scala | 12 +++--------- .../continuous/WriteToContinuousDataSourceExec.scala | 6 +++--- 5 files changed, 13 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 2cc0e38..ab40936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.debug._ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -95,7 +95,7 @@ case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode { * @param child the physical plan child ran by the `DataWritingCommand`. */ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) - extends SparkPlan { + extends UnaryExecNode { override lazy val metrics: Map[String, SQLMetric] = cmd.metrics @@ -106,8 +106,6 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan) rows.map(converter(_).asInstanceOf[InternalRow]) } - override def children: Seq[SparkPlan] = child :: Nil - override def output: Seq[Attribute] = cmd.output override def nodeName: String = "Execute " + cmd.nodeName http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index c3f7b69..9a1fe1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.util.Utils @@ -45,9 +45,9 @@ case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: LogicalPl * The physical plan for writing data into data source v2. */ case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: SparkPlan) - extends SparkPlan { + extends UnaryExecNode { - override def children: Seq[SparkPlan] = Seq(query) + override def child: SparkPlan = query override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 942a6db..67dcdd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -26,7 +26,7 @@ import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.util.Utils @@ -58,9 +58,7 @@ import org.apache.spark.util.Utils * RowQueue ALWAYS happened after pushing into it. */ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) - extends SparkPlan { - - def children: Seq[SparkPlan] = child :: Nil + extends UnaryExecNode { override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala index 5f60343..4c62189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceExec.scala @@ -17,26 +17,20 @@ package org.apache.spark.sql.execution.streaming.continuous -import java.util.UUID - -import org.apache.spark.{HashPartitioner, SparkEnv} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, SinglePartition} -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.streaming.continuous.shuffle.{ContinuousShuffleReadPartition, ContinuousShuffleReadRDD} +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} /** * Physical plan for coalescing a continuous processing plan. * * Currently, only coalesces to a single partition are supported. `numPartitions` must be 1. */ -case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan { +case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode { override def output: Seq[Attribute] = child.output - override def children: Seq[SparkPlan] = child :: Nil - override def outputPartitioning: Partitioning = SinglePartition override def doExecute(): RDD[InternalRow] = { http://git-wip-us.apache.org/repos/asf/spark/blob/950e7374/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala index a797ac1..2178466 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport @@ -32,8 +32,8 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport * The physical plan for writing data into a continuous processing [[StreamingWriteSupport]]. */ case class WriteToContinuousDataSourceExec(writeSupport: StreamingWriteSupport, query: SparkPlan) - extends SparkPlan with Logging { - override def children: Seq[SparkPlan] = Seq(query) + extends UnaryExecNode with Logging { + override def child: SparkPlan = query override def output: Seq[Attribute] = Nil override protected def doExecute(): RDD[InternalRow] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org