[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20831 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r176274536 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- Yes. I think so. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r176159195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- This can be solved if we add a `val stats = relation.partitionStatistics`, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r176006590 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- And I think it isn't worth removing @transient from `relation` and `InMemoryRelation.partitionStatistics` just for this. So I leave it as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r176002739 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- Ah, sorry, I get it wrongly. The reason why it doesn't work is because `relation` is `@transient`. `partitionFilters` needs to be non-lazy, otherwise when we need to access `relation` in executor, we will get a `NullPointerException`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175992410 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- I don't get it. Regardless how copy is implemented in scala, ideally we can just mark `buildFilter` and `partitionFilters` as lazy, and in `doCanonicalize`, create a new `InMemoryTableScanExec`, which won't materialize `partitionFilters` in either the current `InMemoryTableScanExec` or the new `InMemoryTableScanExec`. One problem I can think of is to serialize a canonicalized `InMemoryTableScanExec`, but it should never happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175980451 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, --- End diff -- It is followed. I just ignored `useCompression`, `batchSize` as they are just primitives and don't need to be canonicalized here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175980243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- I've tried that at beginning. However, `partitionFilters` uses `buildFilter`. Making `partitionFilters` a lazy doesn't work because when do `copy`, the initialization of `InMemoryTableScanExec` will try to materialize `partitionFilters` for coping it value. Making `partitionFilters`, `buildFilter` as methods is not enough too, we also need to remove `@transient` from `relation` and `InMemoryRelation.partitionStatistics`. So I think it isn't worth and leave it as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175951728 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -169,7 +174,10 @@ case class InMemoryTableScanExec( override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder]) - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + // When we make canonicalized plan, we can't find a normalized attribute in this map. + // We return a `ColumnStatisticsSchema` for normalized attribute in this case. --- End diff -- It looks weird to call `statsFor` on a canonicalized `InMemoryTableScanExec`, can we just make `buildFilter` a lazy val? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175951487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, --- End diff -- can we follow the parameter order in the constructor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r175030682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, + child = child.canonicalized, + tableName = None)( + _cachedColumnBuffers, + sizeInBytesStats, + statsOfPlanToCache) --- End diff -- aha, ok. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174994822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, + child = child.canonicalized, + tableName = None)( + _cachedColumnBuffers, + sizeInBytesStats, + statsOfPlanToCache) --- End diff -- `cachedColumnBuffers`, `sizeInBytesStats`, `statsOfPlanToCache` won't be considered when comparing two `InMemoryRelation`. So instead of create empty instances of statistics, I just use the original values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174969959 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, + child = child.canonicalized, + tableName = None)( + _cachedColumnBuffers, + sizeInBytesStats, + statsOfPlanToCache) --- End diff -- `statsOfPlanToCache` and `sizeInBytesStats`, too? For instance, `ResolveHint` drops hints in canonicalization: https://github.com/apache/spark/blob/3675af7247e841e9a689666dc20891ba55c612b3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala#L44 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174768746 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, + child = child.canonicalized, + tableName = None)( + _cachedColumnBuffers, + sizeInBytesStats, + statsOfPlanToCache) --- End diff -- `_cachedColumnBuffers` can't be null. If it is null, `copy` will trigger `buildBuffers`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174764688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = StorageLevel.NONE, + child = child.canonicalized, + tableName = None)( + _cachedColumnBuffers, + sizeInBytesStats, + statsOfPlanToCache) --- End diff -- We need to copy these cached data? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174715180 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = new StorageLevel(), --- End diff -- Ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20831#discussion_r174707266 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -68,6 +69,15 @@ case class InMemoryRelation( override protected def innerChildren: Seq[SparkPlan] = Seq(child) + override def doCanonicalize(): logical.LogicalPlan = +copy(output = output.map(QueryPlan.normalizeExprId(_, child.output)), + storageLevel = new StorageLevel(), --- End diff -- `StorageLevel.NONE`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20831: [SPARK-23614][SQL] Fix incorrect reuse exchange w...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/20831 [SPARK-23614][SQL] Fix incorrect reuse exchange when caching is used ## What changes were proposed in this pull request? We should provide customized canonicalize plan for `InMemoryRelation` and `InMemoryTableScanExec`. Otherwise, we can wrongly treat two different cached plans as same result. It causes wrongly reused exchange then. ## How was this patch tested? Added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-23614 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20831.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20831 commit e1f28e2c6c9ba99c92fea339946323c9490062d4 Author: Liang-Chi HsiehDate: 2018-03-15T06:16:22Z Fix incorrect reuse exchange when caching is used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org