[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23127


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236395764
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

> This assumes that that parent operator would always result in some 
UnsafeProjection being eventually added, and hence the output of the 
WholeStageCodegen unit will be UnsafeRows.

I think it's quite a hack in my patch, and that there should be some nicer 
interface to tell the codegened operators whether thei're dealing with 
UnsafeRows input, or InternalRows that may not be UnsafeRows...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236393985
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

updated description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236391673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

The new one should be the same as the previous 
`RowDataSourceScanExec.doProduce` and `RDDScanExec.doProduce` if 
createUnsafeProjection == true, and it should be the same as the previous 
`InputAdapter.doProduce` and `LocalTableScanExec.doProduce` when 
createUnsafeProjection == false.

From the fact that `InputAdapter` was not doing an explicit unsafe 
projection, even though it's input could be InternalRows that are not 
UnsafeRows I derived an assumption that it is safe not to do so as long as 
there is a parent operator. This assumes that that parent operator would always 
result in an UnsafeProjection being eventually added, and hence the output of 
the WholeStageCodegen will be in UnsafeRows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236333530
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
+// Inline mutable state since an InputRDDCodegen is used once in a 
task for WholeStageCodegen
+val input = ctx.addMutableState("scala.collection.Iterator", "input", 
v => s"$v = inputs[0];",
+  forceInline = true)
+val row = ctx.freshName("row")
+
+val outputVars = if (createUnsafeProjection) {
+  // creating the vars will make the parent consume add an unsafe 
projection.
+  ctx.INPUT_ROW = row
+  ctx.currentVars = null
+  output.zipWithIndex.map { case (a, i) =>
+BoundReference(i, a.dataType, a.nullable).genCode(ctx)
+  }
+} else {
+  null
+}
+
+val numOutputRowsCode = if (metrics.contains("numOutputRows")) {
--- End diff --

how about `updateNumOutputRowsMetrics` instead of `numOutputRowsCode`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236332786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

the extra `if createUnsafeProjection` check?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236332511
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

can you highlight the difference between this one and the previous 
`RowDataSourceScanExec.doProduce`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236151394
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---
@@ -199,4 +199,6 @@ case class RDDScanExec(
   override def simpleString: String = {
 s"$nodeName${truncatedString(output, "[", ",", "]")}"
   }
+
+  override def inputRDD: RDD[InternalRow] = rdd
--- End diff --

The test failure is real.  `RDDScanExec.rdd` may not be RDD of unsafe row. 
Maybe we should enforce that, `InputRDDCodegen.inputRDD` is `RDD[UnsafeRow]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236118569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan {
*/
   def needStopCheck: Boolean = parent.needStopCheck
 
+  /**
+   * Helper default should stop check code.
+   */
+  def shouldStopCheckCode: String = if (needStopCheck) {
--- End diff --

we can use in in more places. This can be done in folllowup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-23 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236017398
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,39 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
--- End diff --

Should we reconcile this with the code gen for `RowDataSourceScanExec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-23 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/23127

[SPARK-26159] Codegen for LocalTableScanExec and ExistingRDDExec

## What changes were proposed in this pull request?

Implement codegen for LocalTableScanExec and ExistingRDDExec. Refactor to 
share code with InputAdapter.


## How was this patch tested?

Covered and used in existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26159

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23127


commit 23c2d9111f1cff9059746bb7b48bb8ef7ad7027b
Author: Juliusz Sompolski 
Date:   2018-11-13T09:19:09Z

localtablescanexec codegen




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org