[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20831


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r176274536
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

Yes. I think so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r176159195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

This can be solved if we add a `val stats = relation.partitionStatistics`, 
isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r176006590
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

And I think it isn't worth removing @transient from `relation` and 
`InMemoryRelation.partitionStatistics` just for this. So I leave it as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-21 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r176002739
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

Ah, sorry, I get it wrongly. The reason why it doesn't work is because 
`relation` is `@transient`. `partitionFilters` needs to be non-lazy, otherwise 
when we need to access `relation` in executor, we will get a 
`NullPointerException`.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175992410
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

I don't get it. Regardless how copy is implemented in scala, ideally we can 
just mark `buildFilter` and `partitionFilters` as lazy, and in 
`doCanonicalize`, create a new `InMemoryTableScanExec`, which won't materialize 
`partitionFilters` in either the current `InMemoryTableScanExec` or the new 
`InMemoryTableScanExec`.

One problem I can think of is to serialize a canonicalized 
`InMemoryTableScanExec`, but it should never happen.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175980451
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
--- End diff --

It is followed. I just ignored `useCompression`, `batchSize` as they are 
just primitives and don't need to be canonicalized here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175980243
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

I've tried that at beginning. However, `partitionFilters` uses 
`buildFilter`. Making `partitionFilters` a lazy doesn't work because when do 
`copy`, the initialization of `InMemoryTableScanExec` will try to materialize 
`partitionFilters` for coping it value.

Making `partitionFilters`, `buildFilter` as methods is not enough too, we 
also need to remove `@transient` from `relation` and 
`InMemoryRelation.partitionStatistics`. So I think it isn't worth and leave it 
as is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175951728
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
@@ -169,7 +174,10 @@ case class InMemoryTableScanExec(
   override def outputOrdering: Seq[SortOrder] =
 
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
-  private def statsFor(a: Attribute) = 
relation.partitionStatistics.forAttribute(a)
+  // When we make canonicalized plan, we can't find a normalized attribute 
in this map.
+  // We return a `ColumnStatisticsSchema` for normalized attribute in this 
case.
--- End diff --

It looks weird to call `statsFor` on a canonicalized 
`InMemoryTableScanExec`, can we just make `buildFilter` a lazy val?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175951487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
--- End diff --

can we follow the parameter order in the constructor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-16 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r175030682
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
+  child = child.canonicalized,
+  tableName = None)(
+  _cachedColumnBuffers,
+  sizeInBytesStats,
+  statsOfPlanToCache)
--- End diff --

aha, ok. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174994822
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
+  child = child.canonicalized,
+  tableName = None)(
+  _cachedColumnBuffers,
+  sizeInBytesStats,
+  statsOfPlanToCache)
--- End diff --

`cachedColumnBuffers`, `sizeInBytesStats`, `statsOfPlanToCache` won't be 
considered when comparing two `InMemoryRelation`. So instead of create empty 
instances of statistics, I just use the original values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174969959
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
+  child = child.canonicalized,
+  tableName = None)(
+  _cachedColumnBuffers,
+  sizeInBytesStats,
+  statsOfPlanToCache)
--- End diff --

`statsOfPlanToCache` and `sizeInBytesStats`, too? For instance, 
`ResolveHint`  drops hints in canonicalization: 
https://github.com/apache/spark/blob/3675af7247e841e9a689666dc20891ba55c612b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala#L44


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174768746
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
+  child = child.canonicalized,
+  tableName = None)(
+  _cachedColumnBuffers,
+  sizeInBytesStats,
+  statsOfPlanToCache)
--- End diff --

`_cachedColumnBuffers` can't be null. If it is null, `copy` will trigger 
`buildBuffers`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174764688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = StorageLevel.NONE,
+  child = child.canonicalized,
+  tableName = None)(
+  _cachedColumnBuffers,
+  sizeInBytesStats,
+  statsOfPlanToCache)
--- End diff --

We need to copy these cached data?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174715180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = new StorageLevel(),
--- End diff --

Ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20831#discussion_r174707266
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 ---
@@ -68,6 +69,15 @@ case class InMemoryRelation(
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(child)
 
+  override def doCanonicalize(): logical.LogicalPlan =
+copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)),
+  storageLevel = new StorageLevel(),
--- End diff --

`StorageLevel.NONE`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...

2018-03-15 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/20831

[SPARK-23614][SQL] Fix incorrect reuse exchange when caching is used

## What changes were proposed in this pull request?

We should provide customized canonicalize plan for `InMemoryRelation` and 
`InMemoryTableScanExec`. Otherwise, we can wrongly treat two different cached 
plans as same result. It causes wrongly reused exchange then.

## How was this patch tested?

Added unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-23614

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20831.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20831


commit e1f28e2c6c9ba99c92fea339946323c9490062d4
Author: Liang-Chi Hsieh 
Date:   2018-03-15T06:16:22Z

Fix incorrect reuse exchange when caching is used.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org