[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23152 LGTM, thanks for the change! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23152 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23152 This reproes it: ``` sql("create table all_null (attr1 int, attr2 int)") sql("insert into all_null values (null, null)") sql("analyze table all_null compute statistics for columns attr1, attr2") // check if the stats can be calculated without Cast exception. sql("select * from all_null where attr1 < attr2").queryExecution.stringWithStats ``` Could we piggy back this fix here? Sorry for posting last minute after lgtm. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23152 While at it, could we kill one more potential for a bug? In `FilterEstimation.evaluateBinaryForTwoColumns` there is a ``` attrLeft.dataType match { case StringType | BinaryType => // TODO: It is difficult to support other binary comparisons for String/Binary // type without min/max and advanced statistics like histogram. logDebug("[CBO] No range comparison statistics for String/Binary type " + attrLeft) return None case _ => } ``` Could we change ``` case _ => if (!colStatsMap.hasMinMaxStats(attrLeft)) { logDebug("[CBO] No min/max statistics " + attrLeft) return None } if (!colStatsMap.hasMinMaxStats(attrRight)) { logDebug("[CBO] No min/max statistics " + attrRight) return None } ``` This is one more place that later does ``` val statsIntervalLeft = ValueInterval(colStatLeft.min, colStatLeft.max, attrLeft.dataType) .asInstanceOf[NumericValueInterval] ... val statsIntervalRight = ValueInterval(colStatRight.min, colStatRight.max, attrRight.dataType) .asInstanceOf[NumericValueInterval] ``` assuming that min/maxes are present, and could therefore also hint the ClassCastException. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23152 LGTM, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23152#discussion_r237789881 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala --- @@ -2276,4 +2276,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + + test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { +withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + withTable("all_null") { +sql("create table all_null (attrInt int)") +sql("insert into all_null values (null)") +sql("analyze table all_null compute statistics for columns attrInt") +checkAnswer(sql("select * from all_null where attrInt < 1"), Nil) --- End diff -- Normal query execution doens't trigger it here, because it doesn't need stats so they never get lazy evaluated. Putting a join over it would probably trigger it without having to force it with .queryExecution.stringWithStats. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 jenkins retest this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 Talked with @hvanhovell offline and set `LocalTableScanExec` and `InputAdapter` to not create an unsafe projection, and `RDDScanExec` and `RowDataSourceScanExec` to always do so, which replicates previous behaviour. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 @cloud-fan Thanks. Actually, I had to revert earlier updates because the plan no longer changes for LocalTableScanExec that is alone in a wholestagecodegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236395764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- > This assumes that that parent operator would always result in some UnsafeProjection being eventually added, and hence the output of the WholeStageCodegen unit will be UnsafeRows. I think it's quite a hack in my patch, and that there should be some nicer interface to tell the codegened operators whether thei're dealing with UnsafeRows input, or InternalRows that may not be UnsafeRows... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236393985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- updated description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/23127#discussion_r236391673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + + override def inputRDDs(): Seq[RDD[InternalRow]] = { +inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { --- End diff -- The new one should be the same as the previous `RowDataSourceScanExec.doProduce` and `RDDScanExec.doProduce` if createUnsafeProjection == true, and it should be the same as the previous `InputAdapter.doProduce` and `LocalTableScanExec.doProduce` when createUnsafeProjection == false. From the fact that `InputAdapter` was not doing an explicit unsafe projection, even though it's input could be InternalRows that are not UnsafeRows I derived an assumption that it is safe not to do so as long as there is a parent operator. This assumes that that parent operator would always result in an UnsafeProjection being eventually added, and hence the output of the WholeStageCodegen will be in UnsafeRows. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 @cloud-fan @rednaxelafx Actually, the input to a codegen stage can be an internal row so I can't make the inputRDD be `RDD[UnsafeRow], but the output needs to be UnsafeRow. Doing it like `InputAdapter` did actually make it just pass-through output the internal row. For `InputAdapter`, there always is some parent operator to consume it, and create an unsafe projection in whatever it does, and then the output UnsafeRows. But for an `RDDScanExec` or `RowDataSourceScanExec` could be alone in a WholeStageCodegenExec, and then just doing `${consume(ctx, null, row)}` made it pass-through output the InternalRow from input. WDYT about how I patched it up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and Existin...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23127 cc @hvanhovell @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/23127 [SPARK-26159] Codegen for LocalTableScanExec and ExistingRDDExec ## What changes were proposed in this pull request? Implement codegen for LocalTableScanExec and ExistingRDDExec. Refactor to share code with InputAdapter. ## How was this patch tested? Covered and used in existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26159 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23127.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23127 commit 23c2d9111f1cff9059746bb7b48bb8ef7ad7027b Author: Juliusz Sompolski Date: 2018-11-13T09:19:09Z localtablescanexec codegen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23022 ping @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/23022 cc @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInte...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/23022 [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for decimals not fitting in long ## What changes were proposed in this pull request? Fix Decimal `toScalaBigInt` and `toJavaBigInteger` used to only work for decimals not fitting long. ## How was this patch tested? Added test to DecimalSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26038 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23022 commit 9c52e09357531f081801c25a0c2533fff10aa1d8 Author: Juliusz Sompolski Date: 2018-11-13T10:58:26Z SPARK-26038 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22029 IMHO if a new wrapper was justifiable for the IN-subquery in #21403, then it is also justifiable to add one here for the IN-literal-list case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22566: [SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TA...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22566 LGTM. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22566: [SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TA...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22566 jenkins retest this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r214381992 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,20 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + if (v1.StageStatus.PENDING.equals(stage.status)) { --- End diff -- In the previous behaviour, it would have marked the stages that were ACTIVE as SKIPPED, which now will not happen here anymore. It looks like the code in `onStageCompleted` may handle that in the `stage.status = event.stageInfo.failureReason match {` handles that. Is that the case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/22209#discussion_r214320305 --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala --- @@ -350,11 +350,20 @@ private[spark] class AppStatusListener( val e = it.next() if (job.stageIds.contains(e.getKey()._1)) { val stage = e.getValue() - stage.status = v1.StageStatus.SKIPPED - job.skippedStages += stage.info.stageId - job.skippedTasks += stage.info.numTasks - it.remove() - update(stage, now) + if (v1.StageStatus.PENDING.equals(stage.status)) { --- End diff -- Is there nothing to be done here if StageStatus is ACTIVE? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22286: [SPARK-25284] Spark UI: make sure skipped stages are upd...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22286 Thanks pointing to #22209. Closing this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22286: [SPARK-25284] Spark UI: make sure skipped stages ...
Github user juliuszsompolski closed the pull request at: https://github.com/apache/spark/pull/22286 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22286: [SPARK-25284] Spark UI: make sure skipped stages are upd...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22286 cc @gatorsmile fyi @dbkerkela --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22286: [SPARK-25284] Spark UI: make sure skipped stages ...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/22286 [SPARK-25284] Spark UI: make sure skipped stages are updated onJobEnd ## What changes were proposed in this pull request? Tiny bug in onJobEnd, not forcing the refresh of skipped stages it removes. ## How was this patch tested? YOLO You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-25284 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22286.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22286 commit 3be439c324f3d2f4fd304a97f39913940de98c56 Author: Juliusz Sompolski Date: 2018-08-30T16:41:33Z SPARK-25284 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22268: [DOC] Fix comment on SparkPlanGraphEdge
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/22268 cc @gatorsmile just a tiny nit... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22268: [DOC] Fix comment on SparkPlanGraphEdge
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/22268 [DOC] Fix comment on SparkPlanGraphEdge ## What changes were proposed in this pull request? `fromId` is the child, and `toId` is the parent, see line 127 in `buildSparkPlanGraphNode` above. The edges in Spark UI also go from child to parent. ## How was this patch tested? Comment change only. Inspected code above. Inspected how the edges in Spark UI look like. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark sparkplangraphedgedoc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22268 commit e405f699e51058f8f61a2243862bcd8c6a6013c4 Author: Juliusz Sompolski Date: 2018-08-29T12:19:10Z update doc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21403 Looks good to me, though I'm not very familiar with analyzer. @cloud-fan, @hvanhovell ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21403 @mgaido91 BTW: In SPARK-24395 I would consider the cases to still be valid, because I believe there is no other syntactic way to do a multi-column IN/NOT IN with list of literals. The question is whether it should be treated as structs, or unpacked? If like structs, then the current behavior is correct, I think. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21403 @mgaido91 This also works, +1. What about `a in (select (b, c) from ...)` when `a` is a struct? - I guess allow it, but a potential gotcha during implementation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21403 I think that the way the columns are defined in the subquery should define the semantics. E.g.: `(a, b) IN (select c, d from ...)` - unpack (a, b) and treat it as a multi column comparison as in current semantics. `(a, b) IN (select (c, d) from ..)` - keep it packed and treat it as a single column IN. `(a, b, c) IN (select (d, e), f from ..)` or similar combinations - catch it in analysis as ambiguous `(a, b, c) IN (select (d, e), f, g from ..)` - but this is valid as long as `a` matches the type of `(d, e)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21228: [SPARK-24171] Adding a note for non-deterministic...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21228#discussion_r185792840 --- Diff: R/pkg/R/functions.R --- @@ -3184,6 +3191,7 @@ setMethod("create_map", #' collect(select(df2, collect_list(df2$gear))) #' collect(select(df2, collect_set(df2$gear)))} #' @note collect_list since 2.3.0 +#' @note the function is non-deterministic because its result depends on order of rows. --- End diff -- for collect_list, collect_set maybe word it: "the function is non-deterministic, because the order of collected results depends on order of rows, which may be non-deterministic after a shuffle" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21228: [SPARK-24171] Adding a note for non-deterministic...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21228#discussion_r185791719 --- Diff: R/pkg/R/functions.R --- @@ -963,6 +964,7 @@ setMethod("kurtosis", #' last(df$c, TRUE) #' } #' @note last since 1.4.0 +#' @note the function is non-deterministic because its result depends on order of rows. --- End diff -- for the first/last maybe word it: "the function is non-deterministic, because its results depends on order of rows, which may be non-deterministic after a shuffle" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21133: [SPARK-24013][SQL] Remove unneeded compress in Approxima...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21133 Maybe we could add the former test as a benchmark to `AggregateBenchmark`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184662359 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala --- @@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(query, expected) } } + + test("SPARK-24013: unneeded compress can cause performance issues with sorted input") { +failAfter(30 seconds) { + checkAnswer(sql("select approx_percentile(id, array(0.1)) from range(1000)"), +Row(Array(999160))) --- End diff -- Ok. Yeah, looking at the other tests in this suite it's definitely fine :-). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184656896 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala --- @@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(query, expected) } } + + test("SPARK-24013: unneeded compress can cause performance issues with sorted input") { +failAfter(30 seconds) { + checkAnswer(sql("select approx_percentile(id, array(0.1)) from range(1000)"), +Row(Array(999160))) --- End diff -- nit: With the approx nature of the algorithm, could the exact answer not get flakty through some small changes in code or config? (like e.g. the split of range into tasks, and then different merging of partial aggrs producing slightly different results) maybe just asserting on collect().length == 1 would do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184654132 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala --- @@ -238,12 +238,6 @@ object ApproximatePercentile { summaries = summaries.insert(value) // The result of QuantileSummaries.insert is un-compressed isCompressed = false - - // Currently, QuantileSummaries ignores the construction parameter compressThresHold, - // which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here - // to make sure QuantileSummaries doesn't occupy infinite memory. - // TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold - if (summaries.sampled.length >= compressThresHoldBufferLength) compress() --- End diff -- Sorry, it's my fault of not reading the description attentively :-). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21171: [SPARK-24104] SQLAppStatusListener overwrites metrics on...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/21171 cc @gengliangwang @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21171: [SPARK-24104] SQLAppStatusListener overwrites met...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/21171 [SPARK-24104] SQLAppStatusListener overwrites metrics onDriverAccumUpdates instead of updating them ## What changes were proposed in this pull request? Event `SparkListenerDriverAccumUpdates` may happen multiple times in a query - e.g. every `FileSourceScanExec` and `BroadcastExchangeExec` call `postDriverMetricUpdates`. In Spark 2.2 `SQLListener` updated the map with new values. `SQLAppStatusListener` overwrites it. Unless `update` preserved it in the KV store (dependant on `exec.lastWriteTime`), only the metrics from the last operator that does `postDriverMetricUpdates` are preserved. ## How was this patch tested? Unit test added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-24104 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21171.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21171 commit 69e09cc770514bdb7964b8552456bf7a83df7588 Author: Juliusz Sompolski <julek@...> Date: 2018-04-26T17:52:04Z onDriverAccumUpdates --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184343803 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala --- @@ -279,4 +282,10 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(query, expected) } } + + test("SPARK-24013: unneeded compress can cause performance issues with sorted input") { +failAfter(20 seconds) { + assert(sql("select approx_percentile(id, array(0.1)) from range(1000)").count() == 1) --- End diff -- When you do .count(), column pruning removes the approx_percentile from the query, so the test does not execute approx_percentile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184347998 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala --- @@ -238,12 +238,6 @@ object ApproximatePercentile { summaries = summaries.insert(value) // The result of QuantileSummaries.insert is un-compressed isCompressed = false - - // Currently, QuantileSummaries ignores the construction parameter compressThresHold, - // which may cause QuantileSummaries to occupy unbounded memory. We have to hack around here - // to make sure QuantileSummaries doesn't occupy infinite memory. - // TODO: Figure out why QuantileSummaries ignores construction parameter compressThresHold - if (summaries.sampled.length >= compressThresHoldBufferLength) compress() --- End diff -- I tested if this change doesn't cause `compress()` to not be called at all, and memory consumption to go ubounded, but it appears to be working good - the mem usage through jmap -histo:live when running `sql("select approx_percentile(id, array(0.1)) from range(100L)").collect()` remains stable. The compress() is being called from `QuantileSummaries.insert()`, so it seems that the above TODO got resolved at some point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20718 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20718 jenkins retest this please ``` org.apache.spark.sql.FileBasedDataSourceSuite.(It is not a test it is a sbt.testing.SuiteSelector) org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 15 times over 10.01441841301 seconds. Last failure message: There are 1 possibly leaked file streams.. ``` Could be a problem, but I don't see how it's related. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20718 jenkins retest this please `hudson.plugins.git.GitException: Failed to fetch from https://github.com/apache/spark.git` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20718 cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20718: [SPARK-23514][FOLLOW-UP] Remove more places using...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20718 [SPARK-23514][FOLLOW-UP] Remove more places using sparkContext.hadoopConfiguration directly ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/20679 I missed a few places in SQL tests. For hygiene, they should also use the sessionState interface where possible. ## How was this patch tested? Modified existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23514-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20718.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20718 commit 358a76ae781016586a892aa077d88f0c27876d76 Author: Juliusz Sompolski <julek@...> Date: 2018-03-02T17:05:58Z [SPARK-23514][FOLLOW-UP] Remove more places using sparkContext.hadoopConfiguration directly --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20679 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20679 jenkins retest this please Flaky: `sbt.ForkMain$ForkError: java.io.IOException: Failed to delete: /home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-c0f7c3f9-f48a-4bb8-91f7-e9c710e78b00` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20679: [SPARK-23514] Use SessionState.newHadoopConf() to...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20679#discussion_r170981204 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala --- @@ -115,7 +115,9 @@ private[sql] class SessionState( private[sql] object SessionState { def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): Configuration = { val newHadoopConf = new Configuration(hadoopConf) -sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) newHadoopConf.set(k, v) } +sqlConf.getAllConfs.foreach { case (k, v) => + if (v ne null) newHadoopConf.set(k, v.stripPrefix("spark.hadoop")) --- End diff -- I just reverted this part, it's not really related to the rest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20679 cc @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20679 cc @gatorsmile @rxin We had a chat whether to implement something that would catch direct misuses of sc.hadoopConfiguration in sql module, but it seems that it's not very common, so maybe just fixing it where it happened is enough. @liancheng suggested stripping the "spark.hadoop" prefix to have more compatibility with users specifying or not specifying that prefix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20679: [SPARK-23514] Use SessionState.newHadoopConf() to...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20679 [SPARK-23514] Use SessionState.newHadoopConf() to propage hadoop configs set in SQLConf. ## What changes were proposed in this pull request? A few places in `spark-sql` were using `sc.hadoopConfiguration` directly. They should be using `sessionState.newHadoopConf()` to blend in configs that were set through `SQLConf`. Also, for better UX, for these configs blended in from `SQLConf`, we should consider removing the `spark.hadoop` prefix, so that the settings are recognized whether or not they were specified by the user. ## How was this patch tested? Tested that AlterTableRecoverPartitions now correctly recognizes settings that are passed in to the FileSystem through SQLConf. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23514 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20679 commit 2c070fcd053acb47d8a8c3214d67e106b5683376 Author: Juliusz Sompolski <julek@...> Date: 2018-02-26T15:13:23Z spark-23514 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170567874 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala --- @@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper { stats.rowCount match { case Some(rowCount) if rowCount >= 0 => if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) { -val colStats = stats.attributeStats.get(col) -if (colStats.get.nullCount > 0) { +val colStats = stats.attributeStats.get(col).get +if (!colStats.hasCountStats || colStats.nullCount.get > 0) { --- End diff -- `hasCountStats == distinctCount.isDefined && nullCount.isDefined` So if it passed to the second part of the ||, then `hasCountStats == true -> nullCount.isDefined` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170465836 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,101 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( + distinctCount: Option[BigInt] = None, + min: Option[String] = None, + max: Option[String] = None, + nullCount: Option[BigInt] = None, + avgLen: Option[Long] = None, + maxLen: Option[Long] = None, + histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[ColumnStat.fromExternalString]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they won't appear in the map. + */ + def toMap(colName: String): Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") +distinctCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) +} +nullCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString) +} +avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) } +maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) } +min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } +max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } +histogram.foreach { h => + map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) +} +map.toMap + } + + /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ + def toPlanStat( + colName: String, + dataType: DataType): ColumnStat = +ColumnStat( + distinctCount = distinctCount, + min = min.map(ColumnStat.fromExternalString(_, colName, dataType)), + max = max.map(ColumnStat.fromExternalString(_, colName, dataType)), + nullCount = nullCount, + avgLen = avgLen, + maxLen = maxLen, + histogram = histogram) +} + +object CatalogColumnStat extends Logging { + + // List of string keys used to serialize CatalogColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" + private val KEY_HISTOGRAM = "histogram" + + /** + * Creates a [[CatalogColumnStat]] object from the given map. + * This is used to deserialize column stats from some external storage. + * The serialization side is defined in [[CatalogColumnStat.toMap]]. + */ + def fromMap( +table: String, +colName: String, +map: Map[String, String]): Option[CatalogColumnStat] = { + +try { + Some(CatalogColumnStat( +distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)), --- End diff -- Added `"verify column stats can be deserialized from tblproperties"` test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170465726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala --- @@ -305,15 +260,15 @@ object ColumnStat extends Logging { percentiles: Option[ArrayData]): ColumnStat = { // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins. val cs = ColumnStat( - distinctCount = BigInt(row.getLong(0)), + distinctCount = Option(BigInt(row.getLong(0))), --- End diff -- I'd keep it an Option, just to be prepared for more flexibility and more optionality, unless you have a strong opinion. (note: this code has moved to AnalyzeColumnCommand) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170465448 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,101 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( + distinctCount: Option[BigInt] = None, + min: Option[String] = None, + max: Option[String] = None, + nullCount: Option[BigInt] = None, + avgLen: Option[Long] = None, + maxLen: Option[Long] = None, + histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[ColumnStat.fromExternalString]]. --- End diff -- I think that actually everything from ColumnStat object should move. `fromExternalString` / `toExternalString` -> `CatalogColumnStat` And also: `supportsDatatype` / `supportsHistogram` -> `AnalyzeColumnCommand` `statExprs` / `rowToColumnStat` -> `AnalyzeColumnCommand` because they are tied to that specific method of stats collection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170463362 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,101 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( + distinctCount: Option[BigInt] = None, + min: Option[String] = None, + max: Option[String] = None, + nullCount: Option[BigInt] = None, + avgLen: Option[Long] = None, + maxLen: Option[Long] = None, + histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[ColumnStat.fromExternalString]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they won't appear in the map. + */ + def toMap(colName: String): Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") +distinctCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) +} +nullCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString) +} +avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) } +maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) } +min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } +max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } +histogram.foreach { h => + map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) +} +map.toMap + } + + /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ + def toPlanStat( + colName: String, + dataType: DataType): ColumnStat = +ColumnStat( + distinctCount = distinctCount, + min = min.map(ColumnStat.fromExternalString(_, colName, dataType)), + max = max.map(ColumnStat.fromExternalString(_, colName, dataType)), + nullCount = nullCount, + avgLen = avgLen, + maxLen = maxLen, + histogram = histogram) +} + +object CatalogColumnStat extends Logging { + + // List of string keys used to serialize CatalogColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" + private val KEY_HISTOGRAM = "histogram" + + /** + * Creates a [[CatalogColumnStat]] object from the given map. + * This is used to deserialize column stats from some external storage. + * The serialization side is defined in [[CatalogColumnStat.toMap]]. + */ + def fromMap( +table: String, +colName: String, +map: Map[String, String]): Option[CatalogColumnStat] = { + +try { + Some(CatalogColumnStat( +distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)), --- End diff -- The format doesn't change. There is existing test `StatisticsSuite."verify serialized column stats after analyzing columns"` that the format of the serialized stats in the metastore doesn't change by comparing it to a manual map of properties. I will add a test that verifies it the other way - adds the properties manually as TBLPROPERTIES, and verifies that they are successfully parsed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170462960 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,101 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( + distinctCount: Option[BigInt] = None, + min: Option[String] = None, + max: Option[String] = None, + nullCount: Option[BigInt] = None, + avgLen: Option[Long] = None, + maxLen: Option[Long] = None, + histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[ColumnStat.fromExternalString]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they won't appear in the map. + */ + def toMap(colName: String): Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") +distinctCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) +} +nullCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString) +} +avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) } +maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) } +min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } +max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } +histogram.foreach { h => + map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) +} +map.toMap + } + + /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ + def toPlanStat( --- End diff -- I intentionally made it the same. `CatalogStatistics.toPlanStat` converts it to `Statistics`. `CatalogColumnStat.toPlanStat` converts it to `ColumnStat`. The name signifies that it is used to convert both of these objects to their counterparts that are used in the query plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20624#discussion_r170086812 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala --- @@ -387,6 +390,101 @@ case class CatalogStatistics( } } +/** + * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore. + */ +case class CatalogColumnStat( + distinctCount: Option[BigInt] = None, + min: Option[String] = None, + max: Option[String] = None, + nullCount: Option[BigInt] = None, + avgLen: Option[Long] = None, + maxLen: Option[Long] = None, + histogram: Option[Histogram] = None) { + + /** + * Returns a map from string to string that can be used to serialize the column stats. + * The key is the name of the column and name of the field (e.g. "colName.distinctCount"), + * and the value is the string representation for the value. + * min/max values are stored as Strings. They can be deserialized using + * [[ColumnStat.fromExternalString]]. + * + * As part of the protocol, the returned map always contains a key called "version". + * In the case min/max values are null (None), they won't appear in the map. + */ + def toMap(colName: String): Map[String, String] = { +val map = new scala.collection.mutable.HashMap[String, String] +map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1") +distinctCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString) +} +nullCount.foreach { v => + map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString) +} +avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) } +maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) } +min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) } +max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) } +histogram.foreach { h => + map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h)) +} +map.toMap + } + + /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */ + def toPlanStat( + colName: String, + dataType: DataType): ColumnStat = +ColumnStat( + distinctCount = distinctCount, + min = min.map(ColumnStat.fromExternalString(_, colName, dataType)), + max = max.map(ColumnStat.fromExternalString(_, colName, dataType)), + nullCount = nullCount, + avgLen = avgLen, + maxLen = maxLen, + histogram = histogram) +} + +object CatalogColumnStat extends Logging { + + // List of string keys used to serialize CatalogColumnStat + val KEY_VERSION = "version" + private val KEY_DISTINCT_COUNT = "distinctCount" + private val KEY_MIN_VALUE = "min" + private val KEY_MAX_VALUE = "max" + private val KEY_NULL_COUNT = "nullCount" + private val KEY_AVG_LEN = "avgLen" + private val KEY_MAX_LEN = "maxLen" + private val KEY_HISTOGRAM = "histogram" + + /** + * Creates a [[CatalogColumnStat]] object from the given map. + * This is used to deserialize column stats from some external storage. + * The serialization side is defined in [[CatalogColumnStat.toMap]]. + */ + def fromMap( +table: String, +colName: String, +map: Map[String, String]): Option[CatalogColumnStat] = { + +try { + Some(CatalogColumnStat( +distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)), --- End diff -- The keys or format of stats in the metastore didn't change. After this patch it remains backwards compatible with stats created before. What changed here is that the `map` passed here used to contain stats for just one column, stripped of the columnName prefix, and now I'm passing a `map` that has all statistics for all columns, with keys prefixed by columnName. It reduces complexity in `statsFromProperties`, see https://github.com/apache/spark/pull/20624/files#diff-159191585e10542f013cb3a714f26075R1057 It used to create a filtered map for every column, stripping the prefix together with column name. Now it just passes the map of all column's stat properties, and an individual column picks up what it needs. I'll add a bit of doc / comments about that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20624 cc @gatorsmile @cloud-fan @marmbrus --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20624 [SPARK-23445] ColumnStat refactoring ## What changes were proposed in this pull request? Refactor ColumnStat to be more flexible. * Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information. * For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore. * Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate. The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans. ## How was this patch tested? Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23445 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20624 commit cf3602075dcee35494c72975e361b739939079b4 Author: Juliusz Sompolski <julek@...> Date: 2018-01-19T13:57:46Z column stat refactoring --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r168048937 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -78,9 +79,8 @@ // whether there is a read ahead task running, private boolean isReading; - // If the remaining data size in the current buffer is below this threshold, - // we issue an async read from the underlying input stream. - private final int readAheadThresholdInBytes; + // whether there is a reader waiting for data. + private AtomicBoolean isWaiting = new AtomicBoolean(false); --- End diff -- I'll leave it be - should compile to basically the same, and with using `AtomicBoolean` the intent seems more readable to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r168048795 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -230,24 +227,32 @@ private void signalAsyncReadComplete() { private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); +isWaiting.set(true); try { - while (readInProgress) { + if (readInProgress) { --- End diff -- Good catch, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20555 @jiangxb1987 there is ReadAheadInputStreamSuite that extends GenericFileInputStreamSuite. I updated it and added more combination testing with different buffer sizes that should exercise more interactions between the wrapped and outer buffers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r167971273 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -258,54 +263,43 @@ public int read(byte[] b, int offset, int len) throws IOException { if (len == 0) { return 0; } -stateChangeLock.lock(); -try { - return readInternal(b, offset, len); -} finally { - stateChangeLock.unlock(); -} - } - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { -ByteBuffer temp = activeBuffer; -activeBuffer = readAheadBuffer; -readAheadBuffer = temp; - } - - /** - * Internal read function which should be called only from read() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private int readInternal(byte[] b, int offset, int len) throws IOException { -assert (stateChangeLock.isLocked()); if (!activeBuffer.hasRemaining()) { - waitForAsyncReadComplete(); - if (readAheadBuffer.hasRemaining()) { -swapBuffers(); - } else { -// The first read or activeBuffer is skipped. -readAsync(); + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { waitForAsyncReadComplete(); -if (isEndOfStream()) { - return -1; +if (!readAheadBuffer.hasRemaining()) { + // The first read or activeBuffer is skipped. --- End diff -- skipped using `skip()`. I moved the comment over from a few lines above, but looking at `skip()` now I don't think it can happen - the skip would trigger an `readAsync` read in that case. I'll update the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r167651037 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws IOException { stateChangeLock.lock(); try { while (readInProgress) { +isWaiting.set(true); asyncReadComplete.await(); +isWaiting.set(false); --- End diff -- Good catch, I added `isWaiting.set(false)` to the finally branch. Actually, since the whole implementation assumes that there is only one reader, I removed the while() loop, since there is no other reader to race with us to trigger another read. In practice I think not updating `isWaiting` it would have been benign, as after the exception the query will be going down with an `InterruptedException`, or elsewise anyone upstream handling that exception would most probably declare that stream as unusable afterwards anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20555#discussion_r167646954 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws IOException { if (len == 0) { return 0; } -stateChangeLock.lock(); -try { - return readInternal(b, offset, len); -} finally { - stateChangeLock.unlock(); -} - } - - /** - * flip the active and read ahead buffer - */ - private void swapBuffers() { -ByteBuffer temp = activeBuffer; -activeBuffer = readAheadBuffer; -readAheadBuffer = temp; - } - /** - * Internal read function which should be called only from read() api. The assumption is that - * the stateChangeLock is already acquired in the caller before calling this function. - */ - private int readInternal(byte[] b, int offset, int len) throws IOException { -assert (stateChangeLock.isLocked()); if (!activeBuffer.hasRemaining()) { - waitForAsyncReadComplete(); - if (readAheadBuffer.hasRemaining()) { -swapBuffers(); - } else { -// The first read or activeBuffer is skipped. -readAsync(); + // No remaining in active buffer - lock and switch to write ahead buffer. + stateChangeLock.lock(); + try { waitForAsyncReadComplete(); -if (isEndOfStream()) { - return -1; +if (!readAheadBuffer.hasRemaining()) { + // The first read or activeBuffer is skipped. + readAsync(); + waitForAsyncReadComplete(); + if (isEndOfStream()) { +return -1; + } } +// Swap the newly read read ahead buffer in place of empty active buffer. --- End diff -- Other existing places in comments in the file use `read ahead`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20555 cc @kiszk @sitalkedia @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20555 [SPARK-23366] Improve hot reading path in ReadAheadInputStream ## What changes were proposed in this pull request? `ReadAheadInputStream` was introduced in https://github.com/apache/spark/pull/18317/ to optimize reading spill files from disk. However, from the profiles it seems that the hot path of reading small amounts of data (like readInt) is inefficient - it involves taking locks, and multiple checks. Optimize locking: Lock is not needed when simply accessing the active buffer. Only lock when needing to swap buffers or trigger async reading, or get information about the async state. Optimize short-path single byte reads, that are used e.g. by Java library DataInputStream.readInt. The asyncReader used to call "read" only once on the underlying stream, that never filled the underlying buffer when it was wrapping an LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger the async reader to be triggered to fill the read ahead buffer on each call, because the reader would see that the active buffer is below the refill threshold all the time. However, filling the full buffer all the time could introduce increased latency, so also add an `AtomicBoolean` flag for the async reader to return earlier if there is a reader waiting for data. Remove `readAheadThresholdInBytes` and instead immediately trigger async read when switching the buffers. It allows to simplify code paths, especially the hot one that then only has to check if there is available data in the active buffer, without worrying if it needs to retrigger async read. It seems to have positive effect on perf. ## How was this patch tested? It was noticed as a regression in some workloads after upgrading to Spark 2.3. It was particularly visible on TPCDS Q95 running on instances with fast disk (i3 AWS instances). Running with profiling: * Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read * Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read * Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - very slightly slower, practically within noise. We didn't see other regressions, and many workloads in general seem to be faster with Spark 2.3 (not investigated if thanks to async readed, or unrelated). You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23366 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20555.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20555 commit 987f15ccb01b6c0351fbfdd49d6930b929c50a74 Author: Juliusz Sompolski <julek@...> Date: 2018-01-30T20:54:47Z locking tweak commit b26ffce6780078dbc38bff658e1ef7e9c56c3dd8 Author: Juliusz Sompolski <julek@...> Date: 2018-02-01T14:27:09Z fill the read ahead buffer --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20152: [SPARK-22957] ApproxQuantile breaks if the number of row...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20152 If the serialized form change is a problem, that part can probably be reverted - it's far less likely that a single compressed stats chunk will overflow Int. The bug I hit was in the global rank counter part, and I changed the other part just by reviewing the code around for other places that could conceivably use a Long instead of Int. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20152: [SPARK-22957] ApproxQuantile breaks if the number...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20152 [SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt ## What changes were proposed in this pull request? 32bit Int was used for row rank. That overflowed in a dataframe with more than 2B rows. ## How was this patch tested? Added test, but ignored, as it takes 4 minutes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22957 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20152.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20152 commit 324218b6065f1ad57479d5ee582694826c1309f9 Author: Juliusz Sompolski <julek@...> Date: 2018-01-04T13:22:49Z SPARK-22957 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20136: [SPARK-22938] Assert that SQLConf.get is accessed...
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/20136#discussion_r159408791 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -70,7 +72,7 @@ object SQLConf { * Default config. Only used when there is no active SparkSession for the thread. * See [[get]] for more information. */ - private val fallbackConf = new ThreadLocal[SQLConf] { + private lazy val fallbackConf = new ThreadLocal[SQLConf] { --- End diff -- When I checked (which was before I moved the assertion from here to SQLConf constructor, but it shouldn't matter), not having it as lazy resulted in it being instantiated eagerly as a static member of SQLConf object before SparkEnv was set and hitting the null on SparkEnv. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20136: [SPARK-22938] Assert that SQLConf.get is accessed only o...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/20136 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20136: [SPARK-22938] Assert that SQLConf.get is accessed...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/20136 [SPARK-22938] Assert that SQLConf.get is accessed only on the driver. ## What changes were proposed in this pull request? Assert if code tries to access SQLConf.get on executor. This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs. If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly. ## How was this patch tested? Check in existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20136.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20136 commit d2b3bc6374cde4f8d4ebeedd7612f51d18f13806 Author: Juliusz Sompolski <julek@...> Date: 2017-12-14T17:50:23Z [SPARK-22938] Assert that SQLConf.get is accessed only on the driver. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19923: [SPARK-22721] BytesToBytesMap peak memory not updated.
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19923 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19923: [SPARK-22721] BytesToBytesMap peak memory not updated.
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19923 Sorry @hvanhovell for not getting it fully right the first time... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19923: [SPARK-22721] BytesToBytesMap peak memory not upd...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/19923 [SPARK-22721] BytesToBytesMap peak memory not updated. ## What changes were proposed in this pull request? Follow-up to earlier commit. The peak memory of BytesToBytesMap is not updated in more places - spill() and destructiveIterator(). ## How was this patch tested? Manually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22721cd Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19923.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19923 commit 12de708a3ba894b9568be12402f561b767355acc Author: Juliusz Sompolski <ju...@databricks.com> Date: 2017-12-07T21:27:56Z SPARK-22721 cd --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19915: [SPARK-22721] BytesToBytesMap peak memory usage not accu...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19915 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19915: [SPARK-22721] BytesToBytesMap peak memory usage n...
GitHub user juliuszsompolski opened a pull request: https://github.com/apache/spark/pull/19915 [SPARK-22721] BytesToBytesMap peak memory usage not accurate after reset() ## What changes were proposed in this pull request? BytesToBytesMap doesn't update peak memory usage before shrinking back to initial capacity in reset(), so after a disk spill one never knows what was the size of hash table was before spilling. ## How was this patch tested? Checked manually. You can merge this pull request into a Git repository by running: $ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22721 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19915 commit eeb0f31da08e6bf609b8ca6cc6509b949dcbac6e Author: Juliusz Sompolski <ju...@databricks.com> Date: 2017-12-06T20:25:35Z SPARK-22721 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19689: [SPARK-22462][SQL] Make rdd-based actions in Dataset tra...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19689 Thanks for the fix @viirya! But I'm not a Spark committer to approve it :-). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19656: [SPARK-22445][SQL] move CodegenContext.copyResult to Cod...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19656 `isShouldStopRequired` is currently not respected by most operators (they insert `shouldStop()` code regardless of this setting). If you're refactoring this, maybe make sure that all places that do `shouldStop()` use it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19181 Looks good to me. What do you think @hvanhovell ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19386: [SPARK-22161] [SQL] Add Impala-modified TPC-DS queries
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19386 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843276 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected +} +// this currently fails on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108) +sorter.free(); +//simulate a 'back to back' free. --- End diff -- nit: ws: `// simulate ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843447 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 private var available = Long.MaxValue def markExecutionAsOutOfMemoryOnce(): Unit = { -oomOnce = true +markConseqOOM(1) + } + + def markConseqOOM( n : Int) : Unit = { --- End diff -- nit: markConsequentOOM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843414 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 --- End diff -- nit: conseq -> consequent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843193 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected +} +// this currently fails on NPE at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108) --- End diff -- nit: tense: "this currently fails" -> "[SPARK-21907] this failed ..." At the point when anyone reads it, it will hopefully not fail :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842424 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java --- @@ -85,7 +85,7 @@ private final LinkedList spillWriters = new LinkedList<>(); // These variables are reset after spilling: - @Nullable private volatile UnsafeInMemorySorter inMemSorter; + private @Nullable volatile UnsafeInMemorySorter inMemSorter; --- End diff -- nit: unnecessary change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842918 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); --- End diff -- If this might actually not be zero, maybe don't test this assertion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842522 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -162,14 +162,25 @@ private int getUsableCapacity() { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (null != array) { --- End diff -- nit: RHS literal (array != null) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842730 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java --- @@ -162,14 +162,25 @@ private int getUsableCapacity() { */ public void free() { if (consumer != null) { - consumer.freeArray(array); + if (null != array) { +consumer.freeArray(array); + } array = null; } } public void reset() { if (consumer != null) { consumer.freeArray(array); + // this is needed to prevent a 'nested' spill, --- End diff -- nit: it doesn't prevent a nested spill, it only renders it harmless remove this line - the rest of the comment is true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843914 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -19,10 +19,18 @@ import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.Arrays; import java.util.LinkedList; import java.util.UUID; +import jodd.io.StringOutputStream; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; +import org.junit.Rule; +import org.junit.rules.ExpectedException; --- End diff -- nit: I think you don't use most of these imports anymore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843227 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java --- @@ -139,4 +139,44 @@ public int compare( } assertEquals(dataToSort.length, iterLength); } + + @Test + public void freeAfterOOM() { +final TestMemoryManager testMemoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); +final TaskMemoryManager memoryManager = new TaskMemoryManager( +testMemoryManager, 0); +final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); +final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); +final Object baseObject = dataPage.getBaseObject(); +// Write the records into the data page: +long position = dataPage.getBaseOffset(); + +final HashPartitioner hashPartitioner = new HashPartitioner(4); +// Use integer comparison for comparing prefixes (which are partition ids, in this case) +final PrefixComparator prefixComparator = PrefixComparators.LONG; +final RecordComparator recordComparator = new RecordComparator() { + @Override + public int compare( + Object leftBaseObject, + long leftBaseOffset, + Object rightBaseObject, + long rightBaseOffset) { +return 0; + } +}; +UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, +recordComparator, prefixComparator, 100, shouldUseRadixSort()); + +testMemoryManager.markExecutionAsOutOfMemoryOnce(); +try { + sorter.reset(); +} catch( OutOfMemoryError oom ) { + //as expected --- End diff -- nit: ws: `// as expected` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843046 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch( OutOfMemoryError oom ){ + expectedOOM = oom; +} + +assertNotNull("expected OutOfMmoryError but it seems operation surprisingly succeeded" +,expectedOOM); +String oomStackTrace = Utils.exceptionString(expectedOOM); +assertThat("expected OutOfMemoryError in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset" +, oomStackTrace +, Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset")); --- End diff -- nit: move commas to end of line (3x) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141842948 --- Diff: core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java --- @@ -503,6 +511,39 @@ public void testGetIterator() throws Exception { verifyIntIterator(sorter.getIterator(279), 279, 300); } + @Test + public void testOOMDuringSpill() throws Exception { +final UnsafeExternalSorter sorter = newSorter(); +for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) { + insertNumber(sorter, i); +} +// todo: this might actually not be zero if pageSize is somehow configured differently, +// so we actually have to compute the expected spill size according to the configured page size +assertEquals(0, sorter.getSpillSize()); +// we expect the next insert to attempt growing the pointerssArray +// first allocation is expected to fail, then a spill is triggered which attempts another allocation +// which also fails and we expect to see this OOM here. +// the original code messed with a released array within the spill code +// and ended up with a failed assertion. +// we also expect the location of the OOM to be org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset +memoryManager.markConseqOOM(2); +OutOfMemoryError expectedOOM = null; +try { + insertNumber(sorter, 1024); +} +// we expect an OutOfMemoryError here, anything else (i.e the original NPE is a failure) +catch( OutOfMemoryError oom ){ --- End diff -- nit: ws: catch (OutOfMemoryError oom) { --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill
Github user juliuszsompolski commented on a diff in the pull request: https://github.com/apache/spark/pull/19181#discussion_r141843494 --- Diff: core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala --- @@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf) override def maxOffHeapStorageMemory: Long = 0L - private var oomOnce = false + private var conseqOOM = 0 private var available = Long.MaxValue def markExecutionAsOutOfMemoryOnce(): Unit = { -oomOnce = true +markConseqOOM(1) + } + + def markConseqOOM( n : Int) : Unit = { --- End diff -- nit: ws: `(n: Int): Unit` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...
Github user juliuszsompolski commented on the issue: https://github.com/apache/spark/pull/19353 jenkins retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org