[GitHub] [spark] cloud-fan commented on issue #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics
cloud-fan commented on issue #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics URL: https://github.com/apache/spark/pull/26899#issuecomment-568393504 @WangGuangxin can you leave a comment in the JIRA so that I can assign it to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on issue #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile
yaooqinn commented on issue #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile URL: https://github.com/apache/spark/pull/26905#issuecomment-568392603 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE
manuzhang commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE URL: https://github.com/apache/spark/pull/26968#discussion_r360797024 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -205,6 +205,7 @@ case class AdaptiveSparkPlanExec( // Run the final plan when there's no more unfinished stages. currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) + executionId.foreach(onUpdatePlan) Review comment: Does it matter for what's displayed on UI or anyone interested in sql update event ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360796532 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: do you mean only `CoarseGrainedSchedulerBackend` can't handle non-graceful executor shutdown well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360795988 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. Review comment: shall we remove this comment now? it's not useful as we do add shuffle, the range output partitioning doesn't matter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360796114 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: No, no. Please pay attention to the comment above and note that, here, we're not saying that this issue happens due to `CoarseGrainedSchedulerBackend` may shut down executors non-gracefully. But we say that it may happens due to the executor itself is not gracefully shut down. It's possible that `CoarseGrainedSchedulerBackend` asks someone executor to shutdown and the executor may shutdown non-gracefully. And it's also possible that the host where the executor located suddenly crushed that makes the executor shutdown non-gracefully. Shutdown non-gracefully here only means that an executor shutdown but `CoarseGrainedSchedulerBackend` doesn't receive disconnect event from it. It really doesn't matter what causes the executor shutdown non-gracefully. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360796170 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -55,12 +54,12 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) { // The default chi-sq value should be low - assert(computeChiSquareTest() < 100) + assert(computeChiSquareTest() < 10) Review comment: the physical plan is same as before, what caused this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360795637 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. + val data = spark.range(0, n, 1, 10).sort($"id".desc) Review comment: In the current master ``` scala> spark.range(0, 1, 1, 10).sort("id").explain(true) == Parsed Logical Plan == 'Sort ['id ASC NULLS FIRST], true +- Range (0, 1, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Sort [id#8L ASC NULLS FIRST], true +- Range (0, 1, step=1, splits=Some(10)) == Optimized Logical Plan == Range (0, 1, step=1, splits=Some(10)) == Physical Plan == *(1) Range (0, 1, step=1, splits=10) ``` If we add a shuffle now, then it's a regression and we should fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360795988 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. Review comment: shall we remove this comment now? it's not useful anymore as we do add shuffle, the range output partitioning doesn't matter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360795637 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. + val data = spark.range(0, n, 1, 10).sort($"id".desc) Review comment: In the current master ``` scala> spark.range(0, 1, 1, 10).sort("id").explain(true) == Parsed Logical Plan == 'Sort ['id ASC NULLS FIRST], true +- Range (0, 1, step=1, splits=Some(10)) == Analyzed Logical Plan == id: bigint Sort [id#8L ASC NULLS FIRST], true +- Range (0, 1, step=1, splits=Some(10)) == Optimized Logical Plan == Range (0, 1, step=1, splits=Some(10)) == Physical Plan == *(1) Range (0, 1, step=1, splits=10) ``` If we add a shuffle now, then it's a regression and we should fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #26979: [SPARK-28144][SPARK-29294][SS][FOLLOWUP] Use SystemTime defined in Kafka Time interface
HeartSaVioR commented on issue #26979: [SPARK-28144][SPARK-29294][SS][FOLLOWUP] Use SystemTime defined in Kafka Time interface URL: https://github.com/apache/spark/pull/26979#issuecomment-568390232 Looks like there's some issue on AMPLab Jenkins server. Would the situation be expected to take longer as weekend & Xmas week (year end as well)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
cloud-fan commented on issue #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#issuecomment-568389494 It's good to see that we are looking at the big picture, to support user-specified schema, catalog in options, and schema/partition in builtin metastore. For "1 TableProvider", agree with the expectation here, and I'd say `getTable(properties)` is the simplest to satisfy the expectation. For "2 SupportsExternalMetadata", I think itself can be used as a marker and we don't need `SupportsUserSpecifiedSchema`: ``` - spark.table(...): Metastore schema + partitioning info + properties is passed in to create Table - spark.read.load(...): Call inferSchema + inferPartitioning then pass in inferred schema + partitioning + df options to create Table - spark.read.schema().load(...): Call inferPartitioning, pass in schema + inferred partitioning + df options to create Table ``` For data source, I don't think they care about where the schema/partitioning come from. It can be inferred by themselves, or specified by end-users, or from Spark's builtin catalog. In `getTable(schema, partitioning, properties)`, they need to validate the schema/partitioning anyway. For "3 SupportsCatalogOptions", sounds good to me, but looks better if we can fail with user-specified schema not supported like `TableProvider`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wenfang6 commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
wenfang6 commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#issuecomment-568387161 > @wenfang6 can you follow @maropu's advice at [#26965 (comment)](https://github.com/apache/spark/pull/26965#issuecomment-567915347)? OK, Thank you This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on issue #25028: [SPARK-28227][SQL] Support TRANSFORM with aggregation.
AngersZh commented on issue #25028: [SPARK-28227][SQL] Support TRANSFORM with aggregation. URL: https://github.com/apache/spark/pull/25028#issuecomment-568386669 > @AngersZh, I didn't read all fully but can we make https://issues.apache.org/jira/browse/SPARK-15694 done first as @wangyum's advice? Seems @alfozan is doing that part? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request #26982: [SPARK-30329][ML] add iterator/foreach methods for Vectors
zhengruifeng opened a new pull request #26982: [SPARK-30329][ML] add iterator/foreach methods for Vectors URL: https://github.com/apache/spark/pull/26982 ### What changes were proposed in this pull request? 1, add new foreach-like methods: foreach/foreachNonZero 2, add iterator: iterator/activeIterator/nonZeroIterator ### Why are the changes needed? see the [ticke](https://issues.apache.org/jira/browse/SPARK-30329) for details foreach/foreachNonZero: for both convenience and performace (SparseVector.foreach should be faster than current traversal method) iterator/activeIterator/nonZeroIterator: add the three iterators, so that we can futuremore add/change some impls based on those iterators for both ml and mllib sides, to avoid vector conversions. ### Does this PR introduce any user-facing change? Yes, new methods are added ### How was this patch tested? added testsuites This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360791173 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: `SchedulerBackend` is private so we don't need to worry about plugins. Let me ask the question again: is `CoarseGrainedSchedulerBackend` the only one that may shut down executors non-gracefully? According to what you said, it covers all the scheduler backend so the answer is yes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #26979: [SPARK-28144][SPARK-29294][SS][FOLLOWUP] Use SystemTime defined in Kafka Time interface
HyukjinKwon commented on issue #26979: [SPARK-28144][SPARK-29294][SS][FOLLOWUP] Use SystemTime defined in Kafka Time interface URL: https://github.com/apache/spark/pull/26979#issuecomment-568385294 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE
cloud-fan commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE URL: https://github.com/apache/spark/pull/26968#discussion_r360790661 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -205,6 +205,7 @@ case class AdaptiveSparkPlanExec( // Run the final plan when there's no more unfinished stages. currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) + executionId.foreach(onUpdatePlan) Review comment: It doesn't matter. `onUpdatePlan` only look at `currentPhysicalPlan`, so we must call `onUpdatePlan` after setting `currentPhysicalPlan`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #25028: [SPARK-28227][SQL] Support TRANSFORM with aggregation.
HyukjinKwon commented on issue #25028: [SPARK-28227][SQL] Support TRANSFORM with aggregation. URL: https://github.com/apache/spark/pull/25028#issuecomment-568385096 @AngersZh, I didn't read all fully but can we make https://issues.apache.org/jira/browse/SPARK-15694 done first as @wangyum's advice? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE
manuzhang commented on a change in pull request #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE URL: https://github.com/apache/spark/pull/26968#discussion_r360790195 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ## @@ -205,6 +205,7 @@ case class AdaptiveSparkPlanExec( // Run the final plan when there's no more unfinished stages. currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) + executionId.foreach(onUpdatePlan) Review comment: Why is the plan sent out before `isFinalPlan = true` ? Isn't it the final plan ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360790014 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: Match `CoarseGrainedSchedulerBackend` covers all scheduler backend(Standalone, YARN, K8S, Mesos) that officially supported by Spark and we can guarantee that this issue can be resolved internally. But for other plugged scheduler backend, Spark seems has nothing to do with current `SchedulerBackend` interface at least for this issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] brkyvz commented on issue #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
brkyvz commented on issue #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#issuecomment-568384325 @cloud-fan In an offline conversation I had with @rdblue, we discussed all the APIs (including the proposal in #26868) and all the use-cases we have today. We discussed the following use cases: ``` 1. Just TableProvider - spark.table(...): Only table properties from metastore is passed in to create Table - spark.read.load(...): Only dataFrameOptions is passed in to create Table - spark.read.schema().load(...): Throws exception for not supporting `SupportsUserSpecifiedSchema` 2. SupportsExternalMetadata 2a. SupportsUserSpecifiedSchema (extends SupportsExternalMetadata) - spark.table(...): Metastore schema + partitioning info + properties is passed in to create Table - spark.read.load(...): Call inferSchema + inferPartitioning then pass in inferred schema + partitioning + df options to create Table - spark.read.schema().load(...): Call inferPartitioning, pass in schema + inferred partitioning + df options to create Table 2b. Just SupportsExternalMetadata - spark.table(...): Metastore schema + partitioning info + properties is passed in to create Table - spark.read.load(...): Call inferSchema + inferPartitioning then pass in inferred schema + partitioning + df options to create Table - spark.read.schema().load(...): Throws exception for not supporting `SupportsUserSpecifiedSchema` 3. SupportsCatalogOptions - spark.table(...): Metastore schema + partitioning info + properties is passed in to create Table - spark.read.load(...): extractCatalog provides a catalog to provide schema + partitioning + properties - spark.read.schema().load(...): Use catalog, ignore userSpecifiedSchema ``` We noticed that we could cover all use cases with 2 and 3. Then the question was, do we even need the `getTable(properties)` method in #26868, and it didn't seem required. So, the flow would be: - If `SupportsCatalogOptions` is extended, and we're trying to resolve a table through data source options, always delegate to the catalog implementation blessed by the data source - If not, then the methods in `SupportsExternalMetadata` can be used to create the table instance. - We also need a marker `SupportsUserSpecifiedSchema` to handle the case of users providing schema's where they shouldn't. (Maybe `SupportsUserSpecifiedSchema` has the method `withSchema` to recreate a table with the user provided schema) Let me know what you think This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
HyukjinKwon commented on a change in pull request #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#discussion_r360789679 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ## @@ -103,6 +103,7 @@ private[this] object SharedFactory { val jsonFactory = new JsonFactoryBuilder() Review comment: I am not sure if we should match them or not for now considering the original intention of both functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
HyukjinKwon commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#issuecomment-568384080 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
HyukjinKwon commented on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#issuecomment-568384204 @wenfang6 can you follow @maropu's advice at https://github.com/apache/spark/pull/26965#issuecomment-567915347? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins removed a comment on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
AmplabJenkins removed a comment on issue #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#issuecomment-567869702 Can one of the admins verify this patch? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple
HyukjinKwon commented on a change in pull request #26965: [SQL]Support single quotes json parsing for get_json_object and json_tuple URL: https://github.com/apache/spark/pull/26965#discussion_r360789470 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala ## @@ -103,6 +103,7 @@ private[this] object SharedFactory { val jsonFactory = new JsonFactoryBuilder() Review comment: I think both functions were added mainly to support Hive compatibility, and I think that's why. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] stczwd edited a comment on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
stczwd edited a comment on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#issuecomment-568383598 > We can add an end-to-end test, check the physical plan of a query, and count shuffles. Sure,I will add some tests for these cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] stczwd commented on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
stczwd commented on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#issuecomment-568383598 > We can add an end-to-end test, check the physical plan of a query, and count shuffles. Sure,I will add some tests for these cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26416: [SPARK-29779][CORE] Compact old event log files and cleanup
HeartSaVioR commented on a change in pull request #26416: [SPARK-29779][CORE] Compact old event log files and cleanup URL: https://github.com/apache/spark/pull/26416#discussion_r360788851 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -663,13 +670,49 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private[spark] def getOrUpdateCompactible(reader: EventLogFileReader): Option[Boolean] = { +try { + val info = listing.read(classOf[LogInfo], reader.rootPath.toString) + val compactible = checkEligibilityForCompaction(info, reader) + if (info.compactible != compactible) { +listing.write(info.copy(compactible = compactible)) + } + compactible +} catch { + case _: NoSuchElementException => None +} + } + + protected def checkEligibilityForCompaction( + info: LogInfo, + reader: EventLogFileReader): Option[Boolean] = { +info.compactible.orElse { + // This is not applied to single event log file. + if (reader.lastIndex.isEmpty) { +Some(false) + } else { +if (reader.listEventLogFiles.length > 1) { + // We have at least one 'complete' file to check whether the event log is eligible to + // compact further. + val rate = eventFilterRateCalculator.calculate( Review comment: Changed to update EventFilterBuilder incrementally; the result should be same as reading from the start, as EventFilterBuilder just processes the events flowing through replay listener bus. If we memorize how far we replayed, we can continue replaying from (latest replayed + 1). It will be done per log file, as the unit of compaction is file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] stczwd commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
stczwd commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360788805 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. + val data = spark.range(0, n, 1, 10).sort($"id".desc) Review comment: ``` == Optimized Logical Plan == Sort [id#8L DESC NULLS LAST], true +- Range (0, 100, step=1, splits=Some(10)) == Physical Plan == *(2) Sort [id#8L DESC NULLS LAST], true, 0 +- Exchange rangepartitioning(id#8L DESC NULLS LAST, 4), true, [id=#33] +- *(1) Range (0, 100, step=1, splits=10) ``` The sort api will add global=true in SortExec, similar with orderby, which will cause SortExec's requiredChildDistribution to be parsed into OrderedDistribution. EnsureRequirements will add a ShuffleExchangeExec with rangePartitioning after OrderedDistribution. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon edited a comment on issue #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
HyukjinKwon edited a comment on issue #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#issuecomment-568382829 @MaxGekk, sorry I am rushing but mind if I ask to summerise the idea of this PR in the PR description? If the pushed filters are converted to back Spark's expressions, I think there's no point of doing it (as Spark's optimizer should do that instead). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on issue #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource
HyukjinKwon commented on issue #26973: [SPARK-30323][SQL] Support filters pushdown in CSV datasource URL: https://github.com/apache/spark/pull/26973#issuecomment-568382829 @MaxGekk, sorry I am rushing but mind if I ask to summerise the idea of this PR? If the pushed filters are converted to back Spark's expressions, I think there's no point of doing it (as Spark's optimizer should do that instead). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#discussion_r360788192 ## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ## @@ -215,9 +215,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) - val table = userSpecifiedSchema match { -case Some(schema) => provider.getTable(dsOptions, schema) -case _ => provider.getTable(dsOptions) + val table = provider match { +case hasCatalog: SupportsCatalogOptions => + val ident = hasCatalog.extractIdentifier(dsOptions) + val catalog = CatalogV2Util.getTableProviderCatalog( +hasCatalog, +sparkSession.sessionState.catalogManager, +dsOptions) + catalog.loadTable(ident) Review comment: then shall we fail or log a warning if schema is specified by users? BTW `SupportsCatalogOptions` seems not a mixin as it doesn't need anything from `TableProvider`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] brkyvz commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
brkyvz commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#discussion_r360787476 ## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ## @@ -215,9 +215,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) - val table = userSpecifiedSchema match { -case Some(schema) => provider.getTable(dsOptions, schema) -case _ => provider.getTable(dsOptions) + val table = provider match { +case hasCatalog: SupportsCatalogOptions => + val ident = hasCatalog.extractIdentifier(dsOptions) + val catalog = CatalogV2Util.getTableProviderCatalog( +hasCatalog, +sparkSession.sessionState.catalogManager, +dsOptions) + catalog.loadTable(ident) Review comment: We don't, we simply ignore it. If the TableProvider SupportsCatalogOptions, then we will *always* load the table through the catalog, therefore we don't need user options or partitioning info This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360787039 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: then why do we only match `CoarseGrainedSchedulerBackend` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on issue #26735: [SPARK-30102][WIP][ML][PYSPARK] GMM supports instance weighting
zhengruifeng commented on issue #26735: [SPARK-30102][WIP][ML][PYSPARK] GMM supports instance weighting URL: https://github.com/apache/spark/pull/26735#issuecomment-568379972 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#discussion_r360786007 ## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ## @@ -215,9 +215,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) - val table = userSpecifiedSchema match { -case Some(schema) => provider.getTable(dsOptions, schema) -case _ => provider.getTable(dsOptions) + val table = provider match { +case hasCatalog: SupportsCatalogOptions => + val ident = hasCatalog.extractIdentifier(dsOptions) + val catalog = CatalogV2Util.getTableProviderCatalog( +hasCatalog, +sparkSession.sessionState.catalogManager, +dsOptions) + catalog.loadTable(ident) Review comment: if we call `loadTable`, how do we handle user-specified schema? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider
cloud-fan commented on a change in pull request #26913: [SPARK-29219][SQL] Introduce SupportsCatalogOptions for TableProvider URL: https://github.com/apache/spark/pull/26913#discussion_r360785902 ## File path: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ## @@ -215,9 +215,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) - val table = userSpecifiedSchema match { -case Some(schema) => provider.getTable(dsOptions, schema) -case _ => provider.getTable(dsOptions) + val table = provider match { +case hasCatalog: SupportsCatalogOptions => + val ident = hasCatalog.extractIdentifier(dsOptions) + val catalog = CatalogV2Util.getTableProviderCatalog( +hasCatalog, +sparkSession.sessionState.catalogManager, +dsOptions) + catalog.loadTable(ident) Review comment: shall we always call `TableProvider.getTable`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WangGuangxin closed pull request #26726: [SPARK-30088][SQL]Adaptive execution should convert SortMergeJoin to BroadcastJoin when plan generates empty result
WangGuangxin closed pull request #26726: [SPARK-30088][SQL]Adaptive execution should convert SortMergeJoin to BroadcastJoin when plan generates empty result URL: https://github.com/apache/spark/pull/26726 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch
cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch URL: https://github.com/apache/spark/pull/26930#discussion_r360783984 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -337,13 +337,17 @@ final class ShuffleBlockFetcherIterator( assertPositiveBlockSize(blockId, size) curBlocks += FetchBlockInfo(blockId, size, mapIndex) curRequestSize += size - if (curRequestSize >= targetRemoteRequestSize || - curBlocks.size >= maxBlocksInFlightPerAddress) { + // For batch fetch, the actual block in flight should count for merged block. + val readyForCollectingBlocks = !doBatchFetch && +curBlocks.size >= maxBlocksInFlightPerAddress + if (curRequestSize >= targetRemoteRequestSize || readyForCollectingBlocks) { // Add this FetchRequest val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks) -collectedRemoteRequests += new FetchRequest(address, mergedBlocks) -logDebug(s"Creating fetch request of $curRequestSize at $address " - + s"with ${mergedBlocks.size} blocks") +mergedBlocks.grouped(maxBlocksInFlightPerAddress).foreach { mergedBlock => Review comment: for the last group, it may not exceed either the `targetRemoteRequestSize` or `maxBlocksInFlightPerAddress`. Shall we put them back to the `curBlocks`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360782760 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) Review comment: Hmm...This may can be a problem... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itskals commented on a change in pull request #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever
itskals commented on a change in pull request #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever URL: https://github.com/apache/spark/pull/26975#discussion_r360782455 ## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ## @@ -100,6 +100,11 @@ private[spark] class TaskSetManager( // should not resubmit while executor lost. private val killedByOtherAttempt = new HashSet[Long] + // Add the tid of task into this HashSet when the task is killed by other stage retries. + // For example, if stage failed and retry, when the task in the origin stage finish, it will + // kill the new stage task running the same partition data + private val killedByOtherStageRetries = new HashSet[Long] Review comment: Also the part of code that you marked as problematic in `executorLost` , could it have not been moved to `handleFailedTask`? I feel the code could have looked more clearer there and then rest of the changes might not have been needed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
Ngone51 commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360782486 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: > Do you mean CoarseGrainedSchedulerBackend is the only one that may shut down executors non-gracefully? No. IIUC, it's executor's own behavior but not related to scheduler backend. An executor can be self-lost(no heartbeat) but still alive, e.g. due to network problem. > And if it shuts down gracefully, is it OK to have 2 RemoveExecutor events? Yes, we have protection for this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch
cloud-fan commented on a change in pull request #26930: [SPARK-30290][Core] Count for merged block when fetching continuous blocks in batch URL: https://github.com/apache/spark/pull/26930#discussion_r360782001 ## File path: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ## @@ -337,13 +337,17 @@ final class ShuffleBlockFetcherIterator( assertPositiveBlockSize(blockId, size) curBlocks += FetchBlockInfo(blockId, size, mapIndex) curRequestSize += size - if (curRequestSize >= targetRemoteRequestSize || - curBlocks.size >= maxBlocksInFlightPerAddress) { + // For batch fetch, the actual block in flight should count for merged block. + val readyForCollectingBlocks = !doBatchFetch && Review comment: nit: `exceedsMaxBlocksInFlightPerAddress` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itskals commented on a change in pull request #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever
itskals commented on a change in pull request #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever URL: https://github.com/apache/spark/pull/26975#discussion_r360781112 ## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ## @@ -100,6 +100,11 @@ private[spark] class TaskSetManager( // should not resubmit while executor lost. private val killedByOtherAttempt = new HashSet[Long] + // Add the tid of task into this HashSet when the task is killed by other stage retries. + // For example, if stage failed and retry, when the task in the origin stage finish, it will + // kill the new stage task running the same partition data + private val killedByOtherStageRetries = new HashSet[Long] Review comment: Why is this hashset not looked up in case of handling `executorLost` ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #25024: [SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row
cloud-fan commented on issue #25024: [SPARK-27296][SQL] User Defined Aggregators that do not ser/de on each input row URL: https://github.com/apache/spark/pull/25024#issuecomment-568371219 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360778246 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: > In case of the executors which are not gracefully shut down Do you mean `CoarseGrainedSchedulerBackend` is the only one that may shut down executors non-gracefully? And if it shuts down gracefully, is it OK to have 2 `RemoveExecutor` events? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360778246 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) - // Asynchronously kill the executor to avoid blocking the current thread +// Asynchronously kill the executor to avoid blocking the current thread killExecutorThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { // Note: we want to get an executor back after expiring this one, // so do not simply call `sc.killExecutor` here (SPARK-8119) sc.killAndReplaceExecutor(executorId) +// In case of the executors which are not gracefully shut down, we should remove Review comment: > In case of the executors which are not gracefully shut down Do you mean `CoarseGrainedSchedulerBackend` is the only one that may shut down executors gracefully? And if it shuts down gracefully, is it OK to have 2 `RemoveExecutor` events? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
cloud-fan commented on a change in pull request #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#discussion_r360777925 ## File path: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ## @@ -199,14 +201,20 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) if (now - lastSeenMs > executorTimeoutMs) { logWarning(s"Removing executor $executorId with no recent heartbeats: " + s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms") -scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " + - s"timed out after ${now - lastSeenMs} ms")) Review comment: what about other `SchedulerBackend`? We only send `RemoveExecutor` event for `CoarseGrainedSchedulerBackend` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan edited a comment on issue #26977: [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
cloud-fan edited a comment on issue #26977: [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations URL: https://github.com/apache/spark/pull/26977#issuecomment-568366428 If the analyzer hits max iteration, and the plan is resolved, shall we log warning or fail? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26977: [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
cloud-fan commented on issue #26977: [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations URL: https://github.com/apache/spark/pull/26977#issuecomment-568366428 If the analyzer hits max iteration, and the plan is resolved. shall we log warning or fail? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360770543 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ConfigBehaviorSuite.scala ## @@ -39,9 +39,8 @@ class ConfigBehaviorSuite extends QueryTest with SharedSparkSession { def computeChiSquareTest(): Double = { val n = 1 // Trigger a sort - // Range has range partitioning in its output now. To have a range shuffle, we - // need to run a repartition first. - val data = spark.range(0, n, 1, 1).repartition(10).sort($"id".desc) + // Range has range partitioning in its output now. + val data = spark.range(0, n, 1, 10).sort($"id".desc) Review comment: I'm a bit confused. `spark.range` reports `RangePartitioning`, so there shouldn't be any shuffles. What gets changed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
cloud-fan commented on issue #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#issuecomment-568360846 We can add an end-to-end test, check the physical plan of a query, and count shuffles. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile
cloud-fan commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile URL: https://github.com/apache/spark/pull/26905#discussion_r360769926 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala ## @@ -149,7 +149,7 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession withTempView(table) { (1 to 1000).toDF("col").createOrReplaceTempView(table) checkAnswer( -spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"), +spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800) FROM $table"), Review comment: ah sorry, disallowed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile
yaooqinn commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile URL: https://github.com/apache/spark/pull/26905#discussion_r360769653 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala ## @@ -149,7 +149,7 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession withTempView(table) { (1 to 1000).toDF("col").createOrReplaceTempView(table) checkAnswer( -spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"), +spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800) FROM $table"), Review comment: Yup, but do you mean disallowed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile
cloud-fan commented on a change in pull request #26905: [SPARK-30266][SQL] Avoid match error and int overflow in ApproximatePercentile and Percentile URL: https://github.com/apache/spark/pull/26905#discussion_r360769422 ## File path: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala ## @@ -149,7 +149,7 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSparkSession withTempView(table) { (1 to 1000).toDF("col").createOrReplaceTempView(table) checkAnswer( -spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800D) FROM $table"), +spark.sql(s"SELECT percentile_approx(col, array(0.25 + 0.25D), 200 + 800) FROM $table"), Review comment: shall we add a migration guide? i.e. float/double is allowed as accuracy. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568358023 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics
cloud-fan closed pull request #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics URL: https://github.com/apache/spark/pull/26899 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26868: [SPARK-29665][SQL] refine the TableProvider interface
cloud-fan commented on issue #26868: [SPARK-29665][SQL] refine the TableProvider interface URL: https://github.com/apache/spark/pull/26868#issuecomment-568357827 I've added a note about how `TableProvider` should work with the builtin generic catalog in the `Why are the changes needed?` section. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#discussion_r360768214 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala ## @@ -138,24 +138,59 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] // Limit the use of hashDist since it's controversial val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) val hashDistCol = hashDistUDF(col($(outputCol))) - - // Compute threshold to get around k elements. - // To guarantee to have enough neighbors in one pass, we need (p - err) * N >= M - // so we pick quantile p = M / N + err - // M: the number of nearest neighbors; N: the number of elements in dataset - val relativeError = 0.05 - val approxQuantile = numNearestNeighbors.toDouble / count + relativeError val modelDatasetWithDist = modelDataset.withColumn(distCol, hashDistCol) - if (approxQuantile >= 1) { -modelDatasetWithDist + + val spark = dataset.sparkSession + import spark.implicits._ + + if (numNearestNeighbors < 1000) { +val r = Random.nextInt +val distColIdx = modelDatasetWithDist.schema.fieldNames.indexOf(distCol) +val rows = modelDatasetWithDist + .rdd + .map { row => +val dist = row.getDouble(distColIdx) +(r, (dist, row)) + }.aggregateByKey(new BoundedPriorityQueue[(Double, Row)](numNearestNeighbors)( +Ordering.by[(Double, Row), Double](_._1).reverse))( +seqOp = (c, v) => c += v, +combOp = (c1, c2) => c1 ++= c2 + ).flatMap { case (_, c) => c.iterator.map(_._2) } +spark.createDataFrame(rows, modelDatasetWithDist.schema) Review comment: There is only one test for singleProbe=false: numNearestNeighbors=100, the threshold computed by previous `sort` or current `approxQuantile` is 0.0, filtered with threshold=0.0, there are 231 items. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics
cloud-fan commented on issue #26899: [SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics URL: https://github.com/apache/spark/pull/26899#issuecomment-568357952 thanks, merging to master! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on issue #26803: [SPARK-30178][ML] RobustScaler support large numFeatures
zhengruifeng commented on issue #26803: [SPARK-30178][ML] RobustScaler support large numFeatures URL: https://github.com/apache/spark/pull/26803#issuecomment-568357363 @srowen If you do not object, I will merge this in one or two days. I will try to optimize the performance by including new ops for PairRDD. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26868: [SPARK-29665][SQL] refine the TableProvider interface
cloud-fan commented on issue #26868: [SPARK-29665][SQL] refine the TableProvider interface URL: https://github.com/apache/spark/pull/26868#issuecomment-568356522 > what is your rationale for saying "For [sources like Kafka], a simple getTable(properties) is the best."? We can tell it by looking at the code, but let me explain it here as well. ``` class KafkaProvider implements TableProvider { Table getTable(properties) { return new KafkaTable(properties) } } class KafkaTable implements TableProvider { StructType schema() { return the_fixed_schema; } Transform[] partitioning() { return new Transform[0]; } ScanBuilder ... WriteBuilder ... } ``` This is simpler than the below one, as we don't need to worry about if the passed in schema and partitioning are wrong. ``` class KafkaProvider implements TableProvider { StructType inferSchema() { return the_fixed_schema; } Transform[] inferPartitioning() { return new Transform[0]; } Table getTable(schema, partitioninng, properties) { assert(schema == the_fixed_schema) assert(partitioninng.isEmpty) return new KafkaTable(schema, properties) } } class KafkaTable(schema) implements TableProvider { StructType schema() { return this.schema; } Transform[] partitioning() { return new Transform[0]; } ScanBuilder ... WriteBuilder ... } ``` > If I want to store a Kafka stream in the built-in generic catalog, we agree that catalog should pass the schema and partitioning to TableProvider.getTable (Your point 2.). That means that both getTable(properties) and getTable(schema, partitioning, properties) must be implemented. In the last sync, I think we agree that we should have a "flag" to let Spark not store the schema/partitioning in the built-in generic catalog. `SupportsExternalMetadata` is the flag. If a source don't implement `SupportsExternalMetadata`, then Spark won't store the schema/partitioning in the builtin catalog. When we scan this table, Spark just call `getTable(properties)` and ask the source to report schema/partitioning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568354910 > @Ngone51 can you update the PR description and explicitly mention that this config is newly added in 3.0? updated, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#discussion_r360766075 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala ## @@ -138,24 +138,59 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] // Limit the use of hashDist since it's controversial val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) val hashDistCol = hashDistUDF(col($(outputCol))) - - // Compute threshold to get around k elements. - // To guarantee to have enough neighbors in one pass, we need (p - err) * N >= M - // so we pick quantile p = M / N + err - // M: the number of nearest neighbors; N: the number of elements in dataset - val relativeError = 0.05 - val approxQuantile = numNearestNeighbors.toDouble / count + relativeError val modelDatasetWithDist = modelDataset.withColumn(distCol, hashDistCol) - if (approxQuantile >= 1) { -modelDatasetWithDist + + val spark = dataset.sparkSession + import spark.implicits._ + + if (numNearestNeighbors < 1000) { +val r = Random.nextInt +val distColIdx = modelDatasetWithDist.schema.fieldNames.indexOf(distCol) +val rows = modelDatasetWithDist + .rdd + .map { row => +val dist = row.getDouble(distColIdx) +(r, (dist, row)) + }.aggregateByKey(new BoundedPriorityQueue[(Double, Row)](numNearestNeighbors)( +Ordering.by[(Double, Row), Double](_._1).reverse))( +seqOp = (c, v) => c += v, +combOp = (c1, c2) => c1 ++= c2 + ).flatMap { case (_, c) => c.iterator.map(_._2) } +spark.createDataFrame(rows, modelDatasetWithDist.schema) Review comment: The log of both methods in testsuites: `approxQuantile/QuantileSummaries`: numNearestNeighbors=100, modelSubset.size=231, threshold=0.0 topK numNearestNeighbors=100, modelSubset.size=100, threshold=0.0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26678: [SPARK-30226][SQL] Remove withXXX functions in WriteBuilder
cloud-fan commented on issue #26678: [SPARK-30226][SQL] Remove withXXX functions in WriteBuilder URL: https://github.com/apache/spark/pull/26678#issuecomment-568354546 I think an argument here is if it's marginally better or much better. I think it's much better. Another question is: when should we stop making breaking changes to V2? We have made so many API changes for DS v2 in Spark 3.0 and I'm confused why we can't make API changes now? If there is a point that we should stop making breaking changes, it should be the branch cut. Before that, I think it's worth to make API changes even if it's slightly better. We don't have chances to fix these small API problems anymore after branch cut. cc @srowen @dongjoon-hyun @dbtsai This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#issuecomment-568354256 The total logic is kinda similar to the procedures `recall` & `ranking` in many classfication scenarios. recall: In the computation of `modelSubset`, more candidates than NN is selected. Even if it is said before 3.0.0 that `Compute threshold to get **exact** k elements.` and in current master that `Compute threshold to get around k elements.` Obtaining exact K elements are never impled, since method based on a threshold will select at least K elements. ranking: Then to get the final top-K items, candidates filter by above `hashDist` will be ranked by `keyDist`. I guess in the first part more candidates than NN are needed, no matter which selection method is used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on issue #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on issue #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#issuecomment-568353975 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] iRakson edited a comment on issue #26863: [SPARK-30234]ADD FILE cannot add directories from sql CLI
iRakson edited a comment on issue #26863: [SPARK-30234]ADD FILE cannot add directories from sql CLI URL: https://github.com/apache/spark/pull/26863#issuecomment-567886469 @maropu @cloud-fan @HyukjinKwon Anything other changes needs to be done for this one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng edited a comment on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng edited a comment on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#issuecomment-568335734 @srowen @huaxingao I rethinked this more, and I found another solution: 1, current impl need a threshold, we no longer need `BoundedPriorityQueue`. Current impl compute count and quantile on two pass, we can just compute them together on only one pass. So the advantage that `BoundedPriorityQueue` do not need an extra pass does not exist, `approxQuantile` is enough, but we need to use `QuantileSummaries` directly; 2, still for top-K when nn is small (maybe < 1000 at first?). Why not using `BoundedPriorityQueue` to directly collect the top-K **rows (now distCol for a threshold)**? 3, for the final `modelSubsetWithDistCol.sort(distCol).limit(numNearestNeighbors)`. when nn is small, using `BoundedPriorityQueue` to avoid global sort. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763960 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ## @@ -19,38 +19,75 @@ package org.apache.spark.sql.execution.datasources.v2 import scala.collection.JavaConverters._ -import org.apache.spark.sql.{AnalysisException, Strategy} -import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression} +import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} +import org.apache.spark.sql.connector.read.V1Scan import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} -import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} +import org.apache.spark.sql.sources.TableScan import org.apache.spark.sql.util.CaseInsensitiveStringMap -object DataSourceV2Strategy extends Strategy with PredicateHelper { +class DataSourceV2Strategy(session: SparkSession) extends Strategy with PredicateHelper { import DataSourceV2Implicits._ + private def withProjectAndFilter( + project: Seq[NamedExpression], + filters: Seq[Expression], + scan: LeafExecNode, + needsUnsafeConversion: Boolean): SparkPlan = { +val filterCondition = filters.reduceLeftOption(And) +val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan) + +if (withFilter.output != project || needsUnsafeConversion) { + ProjectExec(project, withFilter) +} else { + withFilter +} + } + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(project, filters, +relation @ DataSourceV2ScanRelation(_, v1Scan: V1Scan, output)) => + val pushedFilters = relation.getTagValue(V2ScanRelationPushDown.PUSHED_FILTERS_TAG) +.getOrElse(Seq.empty) + val v1Relation = v1Scan.toV1TableScan(session.sqlContext) + if (v1Relation.schema != v1Scan.readSchema()) { +throw new IllegalArgumentException( + "The fallback v1 relation reports inconsistent schema:\n" + +"Schema of v2 scan: " + v1Scan.readSchema() + "\n" + +"Schema of v1 relation: " + v1Relation.schema) + } + val rdd = v1Relation match { +case s: TableScan => s.buildScan() +case _ => + throw new IllegalArgumentException( +"`V1Scan.toV1Relation` must return a `TableScan` instance.") + } + val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd) + val originalOutputNames = relation.table.schema().map(_.name) + val requiredColumnsIndex = output.map(_.name).map(originalOutputNames.indexOf) + val dsScan = RowDataSourceScanExec( Review comment: This may be a good idea, but this is what we do for ds v1 (see the v1 rule `DataSourceStrategy`). I'd like to avoid changing the existing design choice we made before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763763 ## File path: sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read + +import org.apache.spark.annotation.{Experimental, Unstable} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait V1Scan extends Scan { + + /** + * Creates an `BaseRelation` that can scan data from DataSource v1 to RDD[Row]. The returned + * relation must be a `TableScan` instance. Review comment: I write it in Scala to follow `V1WriteBuilder`. It's another topic if we should write these fallback traits in Java or Scala, I follow the existing choice to be consistent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#discussion_r360763714 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala ## @@ -138,24 +138,59 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] // Limit the use of hashDist since it's controversial val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) val hashDistCol = hashDistUDF(col($(outputCol))) - - // Compute threshold to get around k elements. - // To guarantee to have enough neighbors in one pass, we need (p - err) * N >= M - // so we pick quantile p = M / N + err - // M: the number of nearest neighbors; N: the number of elements in dataset - val relativeError = 0.05 - val approxQuantile = numNearestNeighbors.toDouble / count + relativeError val modelDatasetWithDist = modelDataset.withColumn(distCol, hashDistCol) - if (approxQuantile >= 1) { -modelDatasetWithDist + + val spark = dataset.sparkSession + import spark.implicits._ + + if (numNearestNeighbors < 1000) { +val r = Random.nextInt +val distColIdx = modelDatasetWithDist.schema.fieldNames.indexOf(distCol) +val rows = modelDatasetWithDist + .rdd + .map { row => +val dist = row.getDouble(distColIdx) +(r, (dist, row)) + }.aggregateByKey(new BoundedPriorityQueue[(Double, Row)](numNearestNeighbors)( +Ordering.by[(Double, Row), Double](_._1).reverse))( +seqOp = (c, v) => c += v, +combOp = (c1, c2) => c1 ++= c2 + ).flatMap { case (_, c) => c.iterator.map(_._2) } +spark.createDataFrame(rows, modelDatasetWithDist.schema) + } else { -val hashThreshold = modelDatasetWithDist.stat - .approxQuantile(distCol, Array(approxQuantile), relativeError) -// Filter the dataset where the hash value is less than the threshold. -modelDatasetWithDist.filter(hashDistCol <= hashThreshold(0)) +val relativeError = 0.05 +val (summaries, count) = modelDatasetWithDist Review comment: Here we compute QuantileSummaries & count together, it should be faster than existing impl. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2
cloud-fan commented on a change in pull request #26231: [SPARK-29572][SQL] add v1 read fallback API in DS v2 URL: https://github.com/apache/spark/pull/26231#discussion_r360763577 ## File path: sql/core/src/main/scala/org/apache/spark/sql/connector/read/V1Scan.scala ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read + +import org.apache.spark.annotation.{Experimental, Unstable} +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +/** + * A trait that should be implemented by V1 DataSources that would like to leverage the DataSource + * V2 read code paths. + * + * This interface is designed to provide Spark DataSources time to migrate to DataSource V2 and + * will be removed in a future Spark release. + * + * @since 3.0.0 + */ +@Experimental +@Unstable +trait V1Scan extends Scan { + + /** + * Creates an `BaseRelation` that can scan data from DataSource v1 to RDD[Row]. The returned + * relation must be a `TableScan` instance. Review comment: `TableScan` is a mixin trait, what we need is `BaseRelation with TableScan`, but that doesn't work well for java. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26847: [SPARK-30214][SQL] Support COMMENT ON syntax
cloud-fan commented on issue #26847: [SPARK-30214][SQL] Support COMMENT ON syntax URL: https://github.com/apache/spark/pull/26847#issuecomment-568350488 @rdblue @brkyvz do you have any comments about the idea itself? We can change this PR to only focus on relation resolution refactor or open a new PR, but I'd like to know your opinions about the proposal first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on a change in pull request #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#discussion_r360763270 ## File path: mllib/src/main/scala/org/apache/spark/ml/feature/LSH.scala ## @@ -138,24 +138,59 @@ private[ml] abstract class LSHModel[T <: LSHModel[T]] // Limit the use of hashDist since it's controversial val hashDistUDF = udf((x: Seq[Vector]) => hashDistance(x, keyHash), DataTypes.DoubleType) val hashDistCol = hashDistUDF(col($(outputCol))) - - // Compute threshold to get around k elements. - // To guarantee to have enough neighbors in one pass, we need (p - err) * N >= M - // so we pick quantile p = M / N + err - // M: the number of nearest neighbors; N: the number of elements in dataset - val relativeError = 0.05 - val approxQuantile = numNearestNeighbors.toDouble / count + relativeError val modelDatasetWithDist = modelDataset.withColumn(distCol, hashDistCol) - if (approxQuantile >= 1) { -modelDatasetWithDist + + val spark = dataset.sparkSession + import spark.implicits._ + + if (numNearestNeighbors < 1000) { +val r = Random.nextInt +val distColIdx = modelDatasetWithDist.schema.fieldNames.indexOf(distCol) +val rows = modelDatasetWithDist + .rdd + .map { row => +val dist = row.getDouble(distColIdx) +(r, (dist, row)) + }.aggregateByKey(new BoundedPriorityQueue[(Double, Row)](numNearestNeighbors)( +Ordering.by[(Double, Row), Double](_._1).reverse))( +seqOp = (c, v) => c += v, +combOp = (c1, c2) => c1 ++= c2 + ).flatMap { case (_, c) => c.iterator.map(_._2) } +spark.createDataFrame(rows, modelDatasetWithDist.schema) Review comment: This method extract numNearestNeighbors rows, however, it fails in `BucketedRandomProjectionLSHSuite`. I checked the datasets, and found that existing method based on `approxQuantile`/`QuantileSummaries` will generate much more candidate items in the step: numNearestNeighbors=100, modelSubset.size=231. Method in current commit will only extract 100 candidates even if there are repeated values, while existing method in master will extract 231 candidates. Then the new method will fail in testsuite because its final precision is not good. I wonder that may be we need more candidates here to generate more accuracy final result? Should I just remove the method base on Top-K, or use a scaled threshold (numNearestNeighbors*3 or numNearestNeighbors*3) here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568348511 @Ngone51 can you update the PR description and explicitly mention that this config is newly added in 3.0? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
cloud-fan commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568348407 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360748210 ## File path: sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql ## @@ -0,0 +1,156 @@ +-- Test filter clause for aggregate expression. + +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (null, 1), (3, null), (null, null) +AS testData(a, b); + +CREATE OR REPLACE TEMPORARY VIEW EMP AS SELECT * FROM VALUES + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (200, "emp 2", date "2003-01-01", 200.00D, 10), + (300, "emp 3", date "2002-01-01", 300.00D, 20), + (400, "emp 4", date "2005-01-01", 400.00D, 30), + (500, "emp 5", date "2001-01-01", 400.00D, NULL), + (600, "emp 6 - no dept", date "2001-01-01", 400.00D, 100), + (700, "emp 7", date "2010-01-01", 400.00D, 100), + (800, "emp 8", date "2016-01-01", 150.00D, 70) +AS EMP(id, emp_name, hiredate, salary, dept_id); + +CREATE OR REPLACE TEMPORARY VIEW DEPT AS SELECT * FROM VALUES + (10, "dept 1", "CA"), + (20, "dept 2", "NY"), + (30, "dept 3", "TX"), + (40, "dept 4 - unassigned", "OR"), + (50, "dept 5 - unassigned", "NJ"), + (70, "dept 7", "FL") +AS DEPT(dept_id, dept_name, state); + +-- Aggregate with filter and empty GroupBy expressions. +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData; +SELECT COUNT(a) FILTER (WHERE a = 1), COUNT(b) FILTER (WHERE a > 1) FROM testData; +SELECT COUNT(id) FILTER (WHERE hiredate = date "2001-01-01") FROM emp; +SELECT COUNT(id) FILTER (WHERE hiredate = to_date('2001-01-01 00:00:00')) FROM emp; +SELECT COUNT(id) FILTER (WHERE hiredate = to_timestamp("2001-01-01 00:00:00")) FROM emp; +SELECT COUNT(id) FILTER (WHERE date_format(hiredate, "-MM-dd") = "2001-01-01") FROM emp; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- SELECT COUNT(DISTINCT id) FILTER (WHERE date_format(hiredate, "-MM-dd HH:mm:ss") = "2001-01-01 00:00:00") FROM emp; + +-- Aggregate with filter and non-empty GroupBy expressions. +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData GROUP BY a; +SELECT a, COUNT(b) FILTER (WHERE a != 2) FROM testData GROUP BY b; +SELECT COUNT(a) FILTER (WHERE a >= 0), COUNT(b) FILTER (WHERE a >= 3) FROM testData GROUP BY a; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > date "2003-01-01") FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_date("2003-01-01")) FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_timestamp("2003-01-01 00:00:00")) FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE date_format(hiredate, "-MM-dd") > "2003-01-01") FROM emp GROUP BY dept_id; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- SELECT dept_id, SUM(DISTINCT salary) FILTER (WHERE date_format(hiredate, "-MM-dd HH:mm:ss") > "2001-01-01 00:00:00") FROM emp GROUP BY dept_id; + +-- Aggregate with filter and grouped by literals. +SELECT 'foo', COUNT(a) FILTER (WHERE b <= 2) FROM testData GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= date "2003-01-01") FROM emp GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= to_date("2003-01-01")) FROM emp GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= to_timestamp("2003-01-01")) FROM emp GROUP BY 1; + +-- Aggregate with filter, more than one aggregate function goes with distinct. +select dept_id, count(distinct emp_name), count(distinct hiredate), sum(salary), sum(salary) filter (where id > 200) from emp group by dept_id; +select dept_id, count(distinct emp_name), count(distinct hiredate), sum(salary) filter (where salary < 400.00D), sum(salary) filter (where id > 200) from emp group by dept_id; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate), sum(salary) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) filter (where salary < 400.00D) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) filter (where salary < 400.00D), sum(salary) filter (where id > 200) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct emp_name), sum(salary) from emp group by dept_id; +-- select dept_id,
[GitHub] [spark] cloud-fan commented on issue #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE
cloud-fan commented on issue #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE URL: https://github.com/apache/spark/pull/26968#issuecomment-568347435 unfortunately we don't have a good test framework for UI. It would be great if we have one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
cloud-fan commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360761003 ## File path: sql/core/src/test/resources/sql-tests/inputs/group-by-filter.sql ## @@ -0,0 +1,156 @@ +-- Test filter clause for aggregate expression. + +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (null, 1), (3, null), (null, null) +AS testData(a, b); + +CREATE OR REPLACE TEMPORARY VIEW EMP AS SELECT * FROM VALUES + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (200, "emp 2", date "2003-01-01", 200.00D, 10), + (300, "emp 3", date "2002-01-01", 300.00D, 20), + (400, "emp 4", date "2005-01-01", 400.00D, 30), + (500, "emp 5", date "2001-01-01", 400.00D, NULL), + (600, "emp 6 - no dept", date "2001-01-01", 400.00D, 100), + (700, "emp 7", date "2010-01-01", 400.00D, 100), + (800, "emp 8", date "2016-01-01", 150.00D, 70) +AS EMP(id, emp_name, hiredate, salary, dept_id); + +CREATE OR REPLACE TEMPORARY VIEW DEPT AS SELECT * FROM VALUES + (10, "dept 1", "CA"), + (20, "dept 2", "NY"), + (30, "dept 3", "TX"), + (40, "dept 4 - unassigned", "OR"), + (50, "dept 5 - unassigned", "NJ"), + (70, "dept 7", "FL") +AS DEPT(dept_id, dept_name, state); + +-- Aggregate with filter and empty GroupBy expressions. +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData; +SELECT COUNT(a) FILTER (WHERE a = 1), COUNT(b) FILTER (WHERE a > 1) FROM testData; +SELECT COUNT(id) FILTER (WHERE hiredate = date "2001-01-01") FROM emp; +SELECT COUNT(id) FILTER (WHERE hiredate = to_date('2001-01-01 00:00:00')) FROM emp; +SELECT COUNT(id) FILTER (WHERE hiredate = to_timestamp("2001-01-01 00:00:00")) FROM emp; +SELECT COUNT(id) FILTER (WHERE date_format(hiredate, "-MM-dd") = "2001-01-01") FROM emp; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- SELECT COUNT(DISTINCT id) FILTER (WHERE date_format(hiredate, "-MM-dd HH:mm:ss") = "2001-01-01 00:00:00") FROM emp; + +-- Aggregate with filter and non-empty GroupBy expressions. +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData GROUP BY a; +SELECT a, COUNT(b) FILTER (WHERE a != 2) FROM testData GROUP BY b; +SELECT COUNT(a) FILTER (WHERE a >= 0), COUNT(b) FILTER (WHERE a >= 3) FROM testData GROUP BY a; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > date "2003-01-01") FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_date("2003-01-01")) FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_timestamp("2003-01-01 00:00:00")) FROM emp GROUP BY dept_id; +SELECT dept_id, SUM(salary) FILTER (WHERE date_format(hiredate, "-MM-dd") > "2003-01-01") FROM emp GROUP BY dept_id; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- SELECT dept_id, SUM(DISTINCT salary) FILTER (WHERE date_format(hiredate, "-MM-dd HH:mm:ss") > "2001-01-01 00:00:00") FROM emp GROUP BY dept_id; + +-- Aggregate with filter and grouped by literals. +SELECT 'foo', COUNT(a) FILTER (WHERE b <= 2) FROM testData GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= date "2003-01-01") FROM emp GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= to_date("2003-01-01")) FROM emp GROUP BY 1; +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= to_timestamp("2003-01-01")) FROM emp GROUP BY 1; + +-- Aggregate with filter, more than one aggregate function goes with distinct. +select dept_id, count(distinct emp_name), count(distinct hiredate), sum(salary), sum(salary) filter (where id > 200) from emp group by dept_id; +select dept_id, count(distinct emp_name), count(distinct hiredate), sum(salary) filter (where salary < 400.00D), sum(salary) filter (where id > 200) from emp group by dept_id; +-- [SPARK-30276] Support Filter expression allows simultaneous use of DISTINCT +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate), sum(salary) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) filter (where salary < 400.00D) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct hiredate) filter (where hiredate > date "2003-01-01"), sum(salary) filter (where salary < 400.00D), sum(salary) filter (where id > 200) from emp group by dept_id; +-- select dept_id, count(distinct emp_name) filter (where id > 200), count(distinct emp_name), sum(salary) from emp group by dept_id; +-- select
[GitHub] [spark] viirya commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
viirya commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360760641 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ## @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) + case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) => + ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child) Review comment: I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. So I think this should be fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by
viirya commented on a change in pull request #26946: [SPARK-30036][SQL] Fix: REPARTITION hint does not work with order by URL: https://github.com/apache/spark/pull/26946#discussion_r360760641 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ## @@ -55,6 +55,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) + case (ShuffleExchangeExec(partitioning, child, _), distribution: OrderedDistribution) => + ShuffleExchangeExec(distribution.createPartitioning(partitioning.numPartitions), child) Review comment: I just tried few possible cases, but can not have a concrete case like this. Maybe this is the only case possibly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 edited a comment on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
Ngone51 edited a comment on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568345502 > Given this is FOLLOW-UP of the feature which is not released officially, maybe "No" is OK as well. Make sense. And thank you for your quick response. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568345502 > Given this is FOLLOW-UP of the feature which is not released officially, maybe "No" is OK as well. Make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on issue #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE
manuzhang commented on issue #26968: [SPARK-29906][SQL][FOLLOWUP] Update the final plan in UI for AQE URL: https://github.com/apache/spark/pull/26968#issuecomment-568345424 @cloud-fan Is there any UT to catch such mistake that it won't happen again ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] seayoun commented on issue #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever
seayoun commented on issue #26975: [SPARK-26975][CORE] Stage retry and executor crash cause app hung up forever URL: https://github.com/apache/spark/pull/26975#issuecomment-568343977 cc @dongjoon-hyun @wangshuo128 @squito This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR edited a comment on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
HeartSaVioR edited a comment on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568343316 It would be great if we make clear about "Does this PR introduce any user-facing change?"; which versions are affected by such incompatible change. If official versions are contained we should also link both via AlternateConfig. I know that's only applied to 3.0.0 so I don't expect we want to use AlternateConfig to couple them, but given the description of PR describes "yes" on the statement, so better to be clear about the reason of "yes". (Given this is FOLLOW-UP of the feature which is not released officially, maybe "No" is OK as well.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
HeartSaVioR commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568343316 It would be great if we make clear about "Does this PR introduce any user-facing change?"; which versions are affected by such incompatible change. If official versions are contained we should also link both via AlternateConfig. I know that's only applied to 3.0.0 so I don't expect we want to use AlternateConfig to couple them, but given the description of PR describes "yes" on the statement, so better to be clear. (Given this is FOLLOW-UP of the feature which is not released officially, maybe "No" is OK as well.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
Ngone51 commented on issue #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981#issuecomment-568341936 cc @gatorsmile @cloud-fan @gaborgsomogyi @HeartSaVioR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #23732: [SPARK-26389][SS] Add force delete temp checkpoint configuration
Ngone51 commented on a change in pull request #23732: [SPARK-26389][SS] Add force delete temp checkpoint configuration URL: https://github.com/apache/spark/pull/23732#discussion_r360756686 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -907,6 +907,12 @@ object SQLConf { .stringConf .createOptional + val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION = +buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation") Review comment: Submitted #26981 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 opened a new pull request #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention
Ngone51 opened a new pull request #26981: [SPARK-26389][SS][FOLLOW-UP]Format config name to follow the other boolean conf naming convention URL: https://github.com/apache/spark/pull/26981 ### What changes were proposed in this pull request? Rename `spark.sql.streaming.forceDeleteTempCheckpointLocation` to `spark.sql.streaming.forceDeleteTempCheckpointLocation.enabled`. ### Why are the changes needed? To follow the other boolean conf naming convention. ### Does this PR introduce any user-facing change? Yes, config name changed. ### How was this patch tested? Pass Jenkins. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 edited a comment on issue #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend
Ngone51 edited a comment on issue #26980: [SPARK-27348][Core] HeartbeatReceiver should remove lost executors from CoarseGrainedSchedulerBackend URL: https://github.com/apache/spark/pull/26980#issuecomment-568275384 cc @xuanyuanking @cloud-fan @jiangxb1987 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360752300 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala ## @@ -194,7 +238,10 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] { val regularAggOperatorMap = regularAggExprs.map { e => // Perform the actual aggregation in the initial aggregate. val af = patchAggregateFunctionChildren(e.aggregateFunction)(regularAggChildAttrLookup.get) -val operator = Alias(e.copy(aggregateFunction = af), e.sql)() +val filterOpt = e.filter.map(_.transform { Review comment: OK. There need the comments indeed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small
zhengruifeng commented on issue #26948: [SPARK-30120][ML] LSH approxNearestNeighbors should use BoundedPriorityQueue when numNearestNeighbors is small URL: https://github.com/apache/spark/pull/26948#issuecomment-568335734 @srowen @huaxingao I rethinked this more, and I found another solution: 1, current impl need a threshold, we no longer need `BoundedPriorityQueue`. Current impl compute count and quantile on two pass, we can just compute them together on only one pass. So the advantage that `BoundedPriorityQueue` do not need an extra pass does not exist, `approxQuantile` is enough, but we need to use `QuantileSummaries` directly; 2, still for top-K when nn is small (maybe < 1000 at first?). Why not using `BoundedPriorityQueue` to directly collect the top-K **rows (now distCol for a threshold)**? Then we do not need `modelSubsetWithDistCol.sort(distCol).limit(numNearestNeighbors)` at the end. We can collect them to drive, and then use sc.parallize to generate a rdd/df; or collect them to some executor as a rdd/df. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360751512 ## File path: sql/core/src/test/resources/sql-tests/results/group-by-filter.sql.out ## @@ -0,0 +1,569 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 47 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES +(1, 1), (1, 2), (2, 1), (2, 2), (3, 1), (3, 2), (null, 1), (3, null), (null, null) +AS testData(a, b) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE OR REPLACE TEMPORARY VIEW EMP AS SELECT * FROM VALUES + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (100, "emp 1", date "2005-01-01", 100.00D, 10), + (200, "emp 2", date "2003-01-01", 200.00D, 10), + (300, "emp 3", date "2002-01-01", 300.00D, 20), + (400, "emp 4", date "2005-01-01", 400.00D, 30), + (500, "emp 5", date "2001-01-01", 400.00D, NULL), + (600, "emp 6 - no dept", date "2001-01-01", 400.00D, 100), + (700, "emp 7", date "2010-01-01", 400.00D, 100), + (800, "emp 8", date "2016-01-01", 150.00D, 70) +AS EMP(id, emp_name, hiredate, salary, dept_id) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +CREATE OR REPLACE TEMPORARY VIEW DEPT AS SELECT * FROM VALUES + (10, "dept 1", "CA"), + (20, "dept 2", "NY"), + (30, "dept 3", "TX"), + (40, "dept 4 - unassigned", "OR"), + (50, "dept 5 - unassigned", "NJ"), + (70, "dept 7", "FL") +AS DEPT(dept_id, dept_name, state) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData +-- !query 3 schema +struct<> +-- !query 3 output +org.apache.spark.sql.AnalysisException +grouping expressions sequence is empty, and 'testdata.`a`' is not an aggregate function. Wrap '(count(testdata.`b`) AS `count(b)`)' in windowing function(s) or wrap 'testdata.`a`' in first() (or first_value) if you don't care which value you get.; + + +-- !query 4 +SELECT COUNT(a) FILTER (WHERE a = 1), COUNT(b) FILTER (WHERE a > 1) FROM testData +-- !query 4 schema +struct +-- !query 4 output +2 4 + + +-- !query 5 +SELECT COUNT(id) FILTER (WHERE hiredate = date "2001-01-01") FROM emp +-- !query 5 schema +struct +-- !query 5 output +2 + + +-- !query 6 +SELECT COUNT(id) FILTER (WHERE hiredate = to_date('2001-01-01 00:00:00')) FROM emp +-- !query 6 schema +struct +-- !query 6 output +2 + + +-- !query 7 +SELECT COUNT(id) FILTER (WHERE hiredate = to_timestamp("2001-01-01 00:00:00")) FROM emp +-- !query 7 schema +struct +-- !query 7 output +2 + + +-- !query 8 +SELECT COUNT(id) FILTER (WHERE date_format(hiredate, "-MM-dd") = "2001-01-01") FROM emp +-- !query 8 schema +struct +-- !query 8 output +2 + + +-- !query 9 +SELECT a, COUNT(b) FILTER (WHERE a >= 2) FROM testData GROUP BY a +-- !query 9 schema +struct +-- !query 9 output +1 0 +2 2 +3 2 +NULL 0 + + +-- !query 10 +SELECT a, COUNT(b) FILTER (WHERE a != 2) FROM testData GROUP BY b +-- !query 10 schema +struct<> +-- !query 10 output +org.apache.spark.sql.AnalysisException +expression 'testdata.`a`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.; + + +-- !query 11 +SELECT COUNT(a) FILTER (WHERE a >= 0), COUNT(b) FILTER (WHERE a >= 3) FROM testData GROUP BY a +-- !query 11 schema +struct +-- !query 11 output +0 0 +2 0 +2 0 +3 2 + + +-- !query 12 +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > date "2003-01-01") FROM emp GROUP BY dept_id +-- !query 12 schema +struct +-- !query 12 output +10 200.0 +100400.0 +20 NULL +30 400.0 +70 150.0 +NULL NULL + + +-- !query 13 +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_date("2003-01-01")) FROM emp GROUP BY dept_id +-- !query 13 schema +struct +-- !query 13 output +10 200.0 +100400.0 +20 NULL +30 400.0 +70 150.0 +NULL NULL + + +-- !query 14 +SELECT dept_id, SUM(salary) FILTER (WHERE hiredate > to_timestamp("2003-01-01 00:00:00")) FROM emp GROUP BY dept_id +-- !query 14 schema +struct +-- !query 14 output +10 200.0 +100400.0 +20 NULL +30 400.0 +70 150.0 +NULL NULL + + +-- !query 15 +SELECT dept_id, SUM(salary) FILTER (WHERE date_format(hiredate, "-MM-dd") > "2003-01-01") FROM emp GROUP BY dept_id +-- !query 15 schema +struct +-- !query 15 output +10 200.0 +100400.0 +20 NULL +30 400.0 +70 150.0 +NULL NULL + + +-- !query 16 +SELECT 'foo', COUNT(a) FILTER (WHERE b <= 2) FROM testData GROUP BY 1 +-- !query 16 schema +struct +-- !query 16 output +foo6 + + +-- !query 17 +SELECT 'foo', SUM(salary) FILTER (WHERE hiredate >= date "2003-01-01") FROM emp GROUP BY 1 +-- !query 17 schema +struct +-- !query 17 output +foo1350.0 + + +-- !query 18 +SELECT 'foo', SUM(salary) FILTER
[GitHub] [spark] beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360750781 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -2835,6 +2835,84 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df, Row(1, 3, 4) :: Row(2, 3, 4) :: Row(3, 3, 4) :: Nil) } + test("Support filter clause for aggregate function with hash aggregate") { +Seq(("APPROX_COUNT_DISTINCT(a)", 3), ("COUNT(a)", 3), ("FIRST(a)", 1), ("LAST(a)", 3), + ("MAX(a)", 3), ("AVG(a)", 2.0), ("MIN(a)", 1), ("SUM(a)", 6), ("PERCENTILE(a, 1)", 3), + ("PERCENTILE_APPROX(a, 0.5, 100)", 2.0), ("COLLECT_LIST(a)", Seq(1, 2, 3)), + ("COLLECT_SET(a)", Seq(1, 2, 3))).foreach { funcToResult => + val query = s"SELECT ${funcToResult._1} FILTER (WHERE b > 1) FROM testData2" + val df = sql(query) + val physical = df.queryExecution.sparkPlan + val aggregateExpressions = physical.collectFirst { +case agg: HashAggregateExec => agg.aggregateExpressions +case agg: ObjectHashAggregateExec => agg.aggregateExpressions + } + assert(aggregateExpressions.isDefined) + assert(aggregateExpressions.get.size == 1) + aggregateExpressions.get.foreach { expr => +assert(expr.filter.isDefined) + } + checkAnswer(df, Row(funcToResult._2) :: Nil) +} + } + + test("Support filter clause for aggregate function uses SortAggregateExec") { +withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + Seq(("PERCENTILE(a, 1)", 3), +("PERCENTILE_APPROX(a, 0.5, 100)", 2.0), +("COLLECT_LIST(a)", Seq(1, 2, 3)), +("COLLECT_SET(a)", Seq(1, 2, 3))).foreach { funcToResult => +val query = s"SELECT ${funcToResult._1} FILTER (WHERE b > 1) FROM testData2" +val df = sql(query) +val physical = df.queryExecution.sparkPlan +val aggregateExpressions = physical.collectFirst { + case agg: SortAggregateExec => agg.aggregateExpressions +} +assert(aggregateExpressions.isDefined) +assert(aggregateExpressions.get.size == 1) +aggregateExpressions.get.foreach { expr => + assert(expr.filter.isDefined) +} +checkAnswer(df, Row(funcToResult._2) :: Nil) + } +} + } + + test("Support filter clause for multiple aggregate function") { +val query = + """ +| SELECT +| COUNT(a), COUNT(a) FILTER (WHERE b >= 2), +| SUM(a), SUM(a) FILTER (WHERE b < 2), +| MAX(a), MAX(a) FILTER (WHERE b > 0), +| MIN(a), MIN(a) FILTER (WHERE b = 1), +| AVG(a), AVG(a) FILTER (WHERE b IN (1, 2)) FROM testData2 + """.stripMargin +val df = sql(query) +val physical = df.queryExecution.sparkPlan +val aggregateExpressions = physical.collectFirst { + case agg: HashAggregateExec => agg.aggregateExpressions +} +assert(aggregateExpressions.isDefined) +assert(aggregateExpressions.get.size == 10) +checkAnswer(df, Row(6, 3, 12, 6, 3, 3, 1, 1, 2, 2) :: Nil) + } + + test("Support filter clause for aggregate function with group") { Review comment: OK This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on a change in pull request #23732: [SPARK-26389][SS] Add force delete temp checkpoint configuration
gatorsmile commented on a change in pull request #23732: [SPARK-26389][SS] Add force delete temp checkpoint configuration URL: https://github.com/apache/spark/pull/23732#discussion_r360750758 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -907,6 +907,12 @@ object SQLConf { .stringConf .createOptional + val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION = +buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation") Review comment: cc @Ngone51 could you submit a PR to make a change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression
beliefer commented on a change in pull request #26656: [SPARK-27986][SQL] Support ANSI SQL filter clause for aggregate expression URL: https://github.com/apache/spark/pull/26656#discussion_r360750209 ## File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ## @@ -2835,6 +2835,84 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession { checkAnswer(df, Row(1, 3, 4) :: Row(2, 3, 4) :: Row(3, 3, 4) :: Nil) } + test("Support filter clause for aggregate function with hash aggregate") { +Seq(("APPROX_COUNT_DISTINCT(a)", 3), ("COUNT(a)", 3), ("FIRST(a)", 1), ("LAST(a)", 3), + ("MAX(a)", 3), ("AVG(a)", 2.0), ("MIN(a)", 1), ("SUM(a)", 6), ("PERCENTILE(a, 1)", 3), + ("PERCENTILE_APPROX(a, 0.5, 100)", 2.0), ("COLLECT_LIST(a)", Seq(1, 2, 3)), + ("COLLECT_SET(a)", Seq(1, 2, 3))).foreach { funcToResult => + val query = s"SELECT ${funcToResult._1} FILTER (WHERE b > 1) FROM testData2" + val df = sql(query) + val physical = df.queryExecution.sparkPlan + val aggregateExpressions = physical.collectFirst { +case agg: HashAggregateExec => agg.aggregateExpressions +case agg: ObjectHashAggregateExec => agg.aggregateExpressions + } + assert(aggregateExpressions.isDefined) + assert(aggregateExpressions.get.size == 1) + aggregateExpressions.get.foreach { expr => +assert(expr.filter.isDefined) + } + checkAnswer(df, Row(funcToResult._2) :: Nil) +} + } + + test("Support filter clause for aggregate function uses SortAggregateExec") { +withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + Seq(("PERCENTILE(a, 1)", 3), Review comment: I will reserve one function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org