[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23127 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236395764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- > This assumes that that parent operator would always result in some UnsafeProjection being eventually added, and hence the output of the WholeStageCodegen unit will be UnsafeRows. I think it's quite a hack in my patch, and that there should be some nicer interface to tell the codegened operators whether thei're dealing with UnsafeRows input, or InternalRows that may not be UnsafeRows... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236393985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- updated description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236391673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- The new one should be the same as the previous `RowDataSourceScanExec.doProduce` and `RDDScanExec.doProduce` if createUnsafeProjection == true, and it should be the same as the previous `InputAdapter.doProduce` and `LocalTableScanExec.doProduce` when createUnsafeProjection == false. From the fact that `InputAdapter` was not doing an explicit unsafe projection, even though it's input could be InternalRows that are not UnsafeRows I derived an assumption that it is safe not to do so as long as there is a parent operator. This assumes that that parent operator would always result in an UnsafeProjection being eventually added, and hence the output of the WholeStageCodegen will be in UnsafeRows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236333530 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { +// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen +val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", + forceInline = true) +val row = ctx.freshName("row") + +val outputVars = if (createUnsafeProjection) { + // creating the vars will make the parent consume add an unsafe projection. + ctx.INPUT_ROW = row + ctx.currentVars = null + output.zipWithIndex.map { case (a, i) => +BoundReference(i, a.dataType, a.nullable).genCode(ctx) + } +} else { + null +} + +val numOutputRowsCode = if (metrics.contains("numOutputRows")) { --- End diff -- how about `updateNumOutputRowsMetrics` instead of `numOutputRowsCode`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236332786 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- the extra `if createUnsafeProjection` check? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236332511 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- can you highlight the difference between this one and the previous `RowDataSourceScanExec.doProduce`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236151394 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala --- @@ -199,4 +199,6 @@ case class RDDScanExec( override def simpleString: String = { s"$nodeName${truncatedString(output, "[", ",", "]")}" } + + override def inputRDD: RDD[InternalRow] = rdd --- End diff -- The test failure is real. `RDDScanExec.rdd` may not be RDD of unsafe row. Maybe we should enforce that, `InputRDDCodegen.inputRDD` is `RDD[UnsafeRow]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236118569 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan { */ def needStopCheck: Boolean = parent.needStopCheck + /** + * Helper default should stop check code. + */ + def shouldStopCheckCode: String = if (needStopCheck) { --- End diff -- we can use in in more places. This can be done in folllowup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236017398 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,39 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { --- End diff -- Should we reconcile this with the code gen for `RowDataSourceScanExec`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/23127 [SPARK-26159] Codegen for LocalTableScanExec and ExistingRDDExec ## What changes were proposed in this pull request? Implement codegen for LocalTableScanExec and ExistingRDDExec. Refactor to share code with InputAdapter. ## How was this patch tested? Covered and used in existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26159 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23127.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23127 commit 23c2d9111f1cff9059746bb7b48bb8ef7ad7027b Author: Juliusz Sompolski Date: 2018-11-13T09:19:09Z localtablescanexec codegen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org