[GitHub] [spark] c21 commented on pull request #36733: [SPARK-39344][SQL] Only disable bucketing when autoBucketedScan is enabled if bucket columns are not in scan output
c21 commented on PR #36733: URL: https://github.com/apache/spark/pull/36733#issuecomment-1143188178 @manuzhang - from my understanding, you want to introduce the feature to enforce number of Spark tasks to be same as number of table buckets, when query not reading bucket column(s). I agree with @cloud-fan in https://github.com/apache/spark/pull/27924#issuecomment-1139340835 that it should not be a design goal for bucketed table to control number of Spark tasks. If you are really want to control number of tasks, you can either tune `spark.sql.files.maxPartitionBytes` or add an extra shuffle `repartition()`/`DISTRIBUTE BY`. I understand your concern per https://github.com/apache/spark/pull/27924#issuecomment-1139360593, but I am afraid of we are introducing a feature here not actually used by many other Spark users. To be honest, the required feature seems not popular based on my experience. My 2 cent is it might help us to post in Spark dev mailing list to gather more feedback from developers / users if they indeed has similar requirement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] nyingping commented on pull request #36737: [SPARK-39347] [SS] Generate wrong time window when (timestamp-startTime) % slideDuration…
nyingping commented on PR #36737: URL: https://github.com/apache/spark/pull/36737#issuecomment-1143157052 This bug was caused by my previous PR. I'm sorry. Could you have a look when you have time @HeartSaVioR @viirya,Thanks in advance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
Ngone51 commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r886345400 ## core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.resource.ResourceRequirement + +/** + * Describe resource requests for different resource profiles. Used for executor schedule. + * + * @param coresPerExecutor cores for each executor. + * @param memoryMbPerExecutor memory for each executor. + * @param customResourcesPerExecutor custom resource requests for each executor. + */ +private[spark] case class ResourceDescription( Review Comment: Had another look around `ExecutorResourcesOrDefaults`, it looks like it's a general abstraction of executor resources that is shared by various cluster managers. So I think it makes sense to extract a specific resource description separately for the Standalone itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] nyingping opened a new pull request, #36737: [SPARK-39347] [SS] Generate wrong time window when (timestamp-startTime) % slideDuration…
nyingping opened a new pull request, #36737: URL: https://github.com/apache/spark/pull/36737 ### What changes were proposed in this pull request? Fix bug that Generate wrong time window when (timestamp-startTime) % slideDuration < 0 The original time window generation rule ``` lastStart <- timestamp - (timestamp - startTime + slideDuration) % slideDuration ``` change like this ``` remainder <- (timestamp - startTime) % slideDuration lastStart <- if (remainder < 0) timestamp - remainder - slideDuration else timestamp - remainder ``` reference: [https://github.com/apache/flink/pull/18982](https://github.com/apache/flink/pull/18982) ### Why are the changes needed? Since the generation strategy of the sliding window in PR [#35362](https://github.com/apache/spark/pull/35362) is changed to the current one, and that leads to a new problem. A window generation error occurs when the time required to process the recorded data is negative and the modulo value between the time and window length is less than 0. In the current test cases, this bug does not thorw up. [ test("negative timestamps")](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala#L299) ``` val df1 = Seq( ("1970-01-01 00:00:02", 1), ("1970-01-01 00:00:12", 2)).toDF("time", "value") val df2 = Seq( (LocalDateTime.parse("1970-01-01T00:00:02"), 1), (LocalDateTime.parse("1970-01-01T00:00:12"), 2)).toDF("time", "value") Seq(df1, df2).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-31 23:59:55", "1970-01-01 00:00:05", 1), Row("1970-01-01 00:00:05", "1970-01-01 00:00:15", 2)) ) } ``` The timestamp of the above test data is not negative, and the value modulo the window length is not negative, so it can be passes the test case. An exception occurs when the timestamp becomes something like this. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` run and get unexpected result: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new unit test. **benchmark result** oldlogic[#18364](https://github.com/apache/spark/pull/18364) VS 【fix version】 ``` Running benchmark: tumbling windows Running case: old logic Stopped after 407 iterations, 10012 ms Running case: new logic Stopped after 615 iterations, 10007 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative old logic 17 25 9 580.1 1.7 1.0X new logic 15 16 2 680.8 1.5 1.2X Running benchmark: sliding windows Running case: old logic Stopped after 10 iterations, 10296 ms Running case: new logic Stopped after 15 iterations, 10391 ms Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Windows 10 10.0 Intel64 Family 6 Model 158 Stepping 10, GenuineIntel sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -
[GitHub] [spark] beliefer commented on pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
beliefer commented on PR #36593: URL: https://github.com/apache/spark/pull/36593#issuecomment-1143143171 > glad to see interest/progress in cross platform SQL/UDFs pushdown. Have you considered doing this leveraging frameworks such as Transport [[1](https://github.com/linkedin/transport), [2](https://engineering.linkedin.com/blog/2018/11/using-translatable-portable-UDFs)] for UDFs and Coral [[1](https://github.com/linkedin/coral), [2](https://engineering.linkedin.com/blog/2020/coral)] for SQL? Spark DS V2 has the special UDF API. Users could implement UDF with the API and DS V2 push-down framework could supports them. If we use Transport and Coral, it seems introduce more components. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #36696: [SPARK-39312][SQL] Use parquet native In predicate for in filter push down
huaxingao commented on code in PR #36696: URL: https://github.com/apache/spark/pull/36696#discussion_r886336186 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala: ## @@ -440,94 +440,126 @@ class ParquetFilters( } private val makeInPredicate: -PartialFunction[ParquetSchemaType, - (Array[String], Array[Any], ParquetStatistics[_]) => FilterPredicate] = { +PartialFunction[ParquetSchemaType, (Array[String], Any) => FilterPredicate] = { +case ParquetBooleanType => + (n: Array[String], v: Any) => +val values = Option(v).map(_.asInstanceOf[Array[Object]]).orNull Review Comment: Thanks for taking a look. I don't think `v` could be null, but this line is actually not needed, so I deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886322452 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: @mridulm your idea sounds good to me! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886318441 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: > The cost for `getTaskAccumulableInfosAndProgressRate` should be similar to what we currently have already, right ? Sorry, I commented in the wrong place. I actually meant the invocation of `setTaskRecordsAndRunTime()` in `handleSuccessfulTask()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] weixiuli commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
weixiuli commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886302383 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: Sounds good. I'll update it. Thanks @mridulm @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on pull request #36699: [SPARK-39317][PYTHON][PS] Add explicitly pdf/pser infer when infer schema in groupby.apply
Yikun commented on PR #36699: URL: https://github.com/apache/spark/pull/36699#issuecomment-1143095923 @gatorsmile Thanks for post review, it's not a breaking change for pandas on spark behavior, no breaking changes from the user's point of view. It's just keep the behavior of Panda 1.3. Did you notice any other behavior changes after this PR? Of course, we can simply mention something like: `After Spark 3.4, the infer schema process of groupby.apply in Pandas on Spark, will first infer the pandas type to ensure the accuracy of the pandas dtype as much as possible`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on pull request #36353: [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in df.eval/update/fillna/setitem
Yikun commented on PR #36353: URL: https://github.com/apache/spark/pull/36353#issuecomment-1143090488 > The condition to generate a new dataframe seems a bit more complex? @ueshin Yes, it is really yes from your example, the `setitem` has behavior influence on several functions and change the final or part of behaviors for these functions (but pandas not mention this behavior change). Anyway, I will raise a issue on Pandas comunity to get the detail attitude for these beahavior changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties
AngersZh commented on PR #36736: URL: https://github.com/apache/spark/pull/36736#issuecomment-1143087613 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu opened a new pull request, #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties
AngersZh opened a new pull request, #36736: URL: https://github.com/apache/spark/pull/36736 ### What changes were proposed in this pull request? `SHOW CREATE TABLE` should redact properties ### Why are the changes needed? Protect sensitive properties ### Does this PR introduce _any_ user-facing change? When user use `SHOW CREATE TABLE`, sensitive properties will be redacted. ### How was this patch tested? Added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
beliefer commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r886292143 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java: ## @@ -235,8 +235,8 @@ public String toString() { try { return builder.build(this); } catch (Throwable e) { - return name + "(" + -Arrays.stream(children).map(child -> child.toString()).reduce((a,b) -> a + "," + b) + ")"; Review Comment: The previous code let the toString display as `Option(...)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #36724: [SPARK-39338][SQL] Remove dynamic pruning subquery if pruningKey's references is empty
wangyum commented on PR #36724: URL: https://github.com/apache/spark/pull/36724#issuecomment-1143081288 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum closed pull request #36724: [SPARK-39338][SQL] Remove dynamic pruning subquery if pruningKey's references is empty
wangyum closed pull request #36724: [SPARK-39338][SQL] Remove dynamic pruning subquery if pruningKey's references is empty URL: https://github.com/apache/spark/pull/36724 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #36735: [SPARK-39350][SQL] DESC NAMESPACE EXTENDED should redact properties
AngersZh commented on PR #36735: URL: https://github.com/apache/spark/pull/36735#issuecomment-1143072617 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu opened a new pull request, #36735: [SPARK-39350][SQL] DESC NAMESPACE EXTENDED should redact properties
AngersZh opened a new pull request, #36735: URL: https://github.com/apache/spark/pull/36735 ### What changes were proposed in this pull request? `DESC NAMESPACE EXTENDED` should redact namespace properties ### Why are the changes needed? Protect sensitive data ### Does this PR introduce _any_ user-facing change? When user use `DESC NAMESPACE EXTENDED`, sensitive information in properties will be redacted ### How was this patch tested? Added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on pull request #36733: [SPARK-39344][SQL] Only disable bucketing when autoBucketedScan is enabled if bucket columns are not in scan output
manuzhang commented on PR #36733: URL: https://github.com/apache/spark/pull/36733#issuecomment-1143066028 @cloud-fan Quote our conversation from https://github.com/apache/spark/pull/27924. Could you explain more on why there will be a correctness issue if HashPartitioning is not a hard requirement for bucketed scan. > As to my original question, is HashPartitioning a hard requirement for bucketed scan? > I don't think so -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pralabhkumar commented on pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
pralabhkumar commented on PR #36701: URL: https://github.com/apache/spark/pull/36701#issuecomment-1143066365 @HyukjinKwon Have done the changes as suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r886278462 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,10 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { -// The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) - }) + expectOffsetChange() Review Comment: well, if we build a product based on Spark, this is still an internal error of the product. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36733: [SPARK-39344][SQL] Only disable bucketing when autoBucketedScan is enabled if bucket columns are not in scan output
cloud-fan commented on PR #36733: URL: https://github.com/apache/spark/pull/36733#issuecomment-1143063443 Does it mean we still has correctness issue if `autoBucketedScan` is false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on PR #35906: URL: https://github.com/apache/spark/pull/35906#issuecomment-1143062528 Thanks for reviewing. Will try to update PR tonight or tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gatorsmile commented on pull request #36699: [SPARK-39317][PYTHON][PS] Add explicitly pdf/pser infer when infer schema in groupby.apply
gatorsmile commented on PR #36699: URL: https://github.com/apache/spark/pull/36699#issuecomment-1143055777 @Yikun Is it a breaking change? Should we add it to the migration guide? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36675: [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
dtenedor commented on code in PR #36675: URL: https://github.com/apache/spark/pull/36675#discussion_r886266739 ## sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java: ## @@ -270,13 +271,40 @@ private void initBatch( vectors[i + partitionIdx].setIsConstant(); } } + +// For Parquet tables whose columns have associated DEFAULT values, this reader must return +// those values instead of NULL when the corresponding columns are not present in storage (i.e. +// belong to the 'missingColumns' field in this class). +ColumnVector[] finalColumns = new ColumnVector[sparkSchema.fields().length]; +for (int i = 0; i < columnVectors.length; i++) { + Object defaultValue = sparkRequestedSchema.existenceDefaultValues()[i]; + if (defaultValue == null) { +finalColumns[i] = vectors[i]; + } else { +WritableColumnVector writable; +if (memMode == MemoryMode.OFF_HEAP) { + writable = new OffHeapColumnVector(capacity, vectors[i].dataType()); +} else { + writable = new OnHeapColumnVector(capacity, vectors[i].dataType()); +} +Optional appended = writable.appendObjects(capacity, defaultValue); +if (!appended.isPresent()) { + throw new IOException("Cannot assign default column value to result column batch in " + +"vectorized Parquet reader because the data type is not supported: " + defaultValue); +} Review Comment: Good catch, I updated the behavior to be the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36675: [SPARK-39294][SQL] Support vectorized Orc scans with DEFAULT values
dtenedor commented on code in PR #36675: URL: https://github.com/apache/spark/pull/36675#discussion_r886266583 ## sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java: ## @@ -270,13 +271,40 @@ private void initBatch( vectors[i + partitionIdx].setIsConstant(); } } + +// For Parquet tables whose columns have associated DEFAULT values, this reader must return +// those values instead of NULL when the corresponding columns are not present in storage (i.e. +// belong to the 'missingColumns' field in this class). +ColumnVector[] finalColumns = new ColumnVector[sparkSchema.fields().length]; Review Comment: Yes, apologies for confusion, this PR is based off of https://github.com/apache/spark/pull/36672 which changes the Parquet reader. This PR only adds changes for the Orc reader on top of that. Please feel free to wait to review this PR again until the base PR is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
mridulm commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886264430 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: Having said that, I was thinking along lines of the following: 1. For successful tasks, as part of TSM.`handleSuccessfulTask`, update the metrics required for efficiency computation. 1. That is, in `handleSuccessfulTask`, we maintain state required for efficiency computation - without needing to recompute it for all tasks in `maybeRecompute` -> just do it for running tasks. 1. Note, also cleanup state from calculator (see below) 3. This is similar to what we do with medianHeap. 2. With this in place, I was not sure if we want to maintain the state in `TaskInfo` anymore. 1. We could simply maintain a concurrent map for running tasks -> efficiency details in `InefficientTaskCalculator` 2. In `executorHeartbeatReceived`, we could simply delegate to TSM to update its calculator with the state given task id. Thoughts ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
mridulm commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886264430 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: Having said that, I was thinking along lines of the following: 1. For successful tasks, as part of TSM.`handleSuccessfulTask`, update the metrics required for efficiency computation. 1. That is, in `handleSuccessfulTask`, we maintain state required for efficiency computation - without needing to recompute it for all tasks in `maybeRecompute` -> just do it for running tasks. 2. This is similar to what we do with medianHeap. 2. With this in place, I was not sure if we want to maintain the state in `TaskInfo` anymore. 1. We could simply maintain a concurrent map for running tasks -> efficiency details in `InefficientTaskCalculator` 2. In `executorHeartbeatReceived`, we could simply delegate to TSM to update its calculator with the state given task id. Thoughts ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #33457: [SPARK-36237][UI][SQL] Attach and start handler after application started in UI
AngersZh commented on PR #33457: URL: https://github.com/apache/spark/pull/33457#issuecomment-1143043489 @gongzh021 Maybe you can check this commit https://github.com/apache/spark/pull/33457/commits/dba26cd5bd1aaacb01e08cfcfef9f02ffe96d018 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on pull request #36733: [SPARK-39344][SQL] Only disable bucketing when autoBucketedScan is enabled if bucket columns are not in scan output
manuzhang commented on PR #36733: URL: https://github.com/apache/spark/pull/36733#issuecomment-1143042809 cc @wzhfy @cloud-fan @c21 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on pull request #27924: [SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns
manuzhang commented on PR #27924: URL: https://github.com/apache/spark/pull/27924#issuecomment-1143042046 @cloud-fan please help check https://github.com/apache/spark/pull/36733 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
mridulm commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r886259895 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: The cost for `getTaskAccumulableInfosAndProgressRate` should be similar to what we currently have already, right ? This is replacing an existing `updates.map` - though there is an additional `if` check included - are you concerned about that ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36703: [SPARK-39321][SQL] Refactor TryCast to use RuntimeReplaceable
cloud-fan commented on PR #36703: URL: https://github.com/apache/spark/pull/36703#issuecomment-1143035116 cc @gengliangwang @beliefer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on PR #35906: URL: https://github.com/apache/spark/pull/35906#issuecomment-1143029732 @zhouyejoe As part of looking at the comment [above](https://github.com/apache/spark/pull/35906#discussion_r836949853). An additional query - unless there is a cleanup request to ESS (right now, application termination - but in future, as part of SPARK-38005), we need to preserve the finalization marker for a successful application + appAttempt + shuffleId + shuffleMergeId - to prevent future pushes from overwriting data. We need to additionally handle this in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r886252782 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -536,9 +645,20 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } } // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results - // sent to the driver will be empty. This cam happen when the service didn't receive any + // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. + if (db != null) { Review Comment: There are 4 cases here: 1. When `mergePartitionsInfo == null`. 1. ESS never received a push for this mergeId, but we want to prevent future pushes, and so add a marker entry. 2. In this case, makes sense to add the entry to level db as well. 2. When `mergePartitionsInfo != null`, we have three cases: 1. The first condition, in the `if`, results in exception - so that does not hit this case. 2. The second is when `msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId`. 1. We are scheduling a cleanup in this case - so all keys are going to get deleted. 3. The happy path - we do want the entry to be added. On further thought, 2.2 is the issue above. Should we be doing `cleanUpAppShufflePartitionInfoInDB` in `closeAndDeletePartitionFiles` ? This is removing the finalization marker from the level db - which will continue to exist in the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
ulysses-you commented on PR #36698: URL: https://github.com/apache/spark/pull/36698#issuecomment-1143024193 thank you @cloud-fan address all comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36665: [SPARK-39287][CORE] TaskSchedulerImpl should quickly ignore task finished event if its task was finished state.
mridulm commented on code in PR #36665: URL: https://github.com/apache/spark/pull/36665#discussion_r886246382 ## core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala: ## @@ -102,6 +102,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } + // quickly return if the task has finished + if (scheduler.isFinishedTask(taskSetManager, tid)) { +return Review Comment: Agree with @Ngone51, this is something which is checked below (in `handleSuccessfulTask`) - the potential improvement, for an unlikely corner case, is not worth the additional complexity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on pull request #36672: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
dtenedor commented on PR #36672: URL: https://github.com/apache/spark/pull/36672#issuecomment-1142910763 > Also, can we add a test to check that the DEFAULT values work? Thanks. @sadikovi Sure, this is done in `InsertSuite`, by adding a new configuration to the case covering Parquet files (previously it only covered the non-vectorized case, but now with `Config(None)` it also runs the test over the vectorized case as well): ``` TestCase( dataSource = "parquet", Seq( Config( None), Config( Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"), insertNullsToStorage = false))) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36672: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
dtenedor commented on code in PR #36672: URL: https://github.com/apache/spark/pull/36672#discussion_r886234949 ## sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java: ## @@ -270,13 +271,40 @@ private void initBatch( vectors[i + partitionIdx].setIsConstant(); } } + +// For Parquet tables whose columns have associated DEFAULT values, this reader must return +// those values instead of NULL when the corresponding columns are not present in storage (i.e. +// belong to the 'missingColumns' field in this class). +ColumnVector[] finalColumns = new ColumnVector[sparkSchema.fields().length]; +for (int i = 0; i < columnVectors.length; i++) { + Object defaultValue = sparkRequestedSchema.existenceDefaultValues()[i]; + if (defaultValue == null) { +finalColumns[i] = vectors[i]; + } else { +WritableColumnVector writable; +if (memMode == MemoryMode.OFF_HEAP) { + writable = new OffHeapColumnVector(capacity, vectors[i].dataType()); Review Comment: Sure, I added a comment explaining this (`appendObjects` delegates to other existing methods like `appendFloats`). And I made a change to reuse the existing ColumnVector instead of creating a new one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36353: [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in df.eval/update/fillna/setitem
ueshin commented on code in PR #36353: URL: https://github.com/apache/spark/pull/36353#discussion_r886205809 ## python/pyspark/pandas/tests/test_dataframe.py: ## @@ -2910,7 +2920,12 @@ def get_data(left_columns=None, right_columns=None): left_pdf.update(right_pdf) left_psdf.update(right_psdf) self.assert_eq(left_pdf.sort_values(by=["A", "B"]), left_psdf.sort_values(by=["A", "B"])) -self.assert_eq(psser.sort_index(), pser.sort_index()) +# SPARK-38946: Since Spark 3.4, df.update generate a new dataframe to follow +# pandas 1.4 behaviors +if LooseVersion(pd.__version__) >= LooseVersion("1.4"): +self.assert_eq(psser.sort_index(), pser.sort_index()) Review Comment: ditto. ## python/pyspark/pandas/tests/test_dataframe.py: ## @@ -270,7 +270,12 @@ def test_inplace(self): psdf["a"] = psdf["a"] + 10 self.assert_eq(psdf, pdf) -self.assert_eq(psser, pser) +# SPARK-38946: Since Spark 3.4, inplace set generate a new dataframe to follow +# pandas 1.4 behaviors +if LooseVersion(pd.__version__) >= LooseVersion("1.4"): +self.assert_eq(psser, pser) Review Comment: Seems like this is a common test with the old pandas? Shall we move it out of `if LooseVersion(pd.__version__) >= LooseVersion("1.4"):`? ## python/pyspark/pandas/tests/test_dataframe.py: ## @@ -1449,7 +1454,12 @@ def test_fillna(self): pdf.fillna({"x": -1, "y": -2, "z": -5}, inplace=True) psdf.fillna({"x": -1, "y": -2, "z": -5}, inplace=True) self.assert_eq(psdf, pdf) -self.assert_eq(psser, pser) +# SPARK-38946: Since Spark 3.4, fillna with inplace generate a new dataframe to follow +# pandas 1.4 behaviors +if LooseVersion(pd.__version__) >= LooseVersion("1.4"): +self.assert_eq(psser, pser) Review Comment: ditto. ## python/pyspark/pandas/tests/test_dataframe.py: ## @@ -5151,7 +5166,11 @@ def test_eval(self): pdf.eval("A = B + C", inplace=True) psdf.eval("A = B + C", inplace=True) self.assert_eq(pdf, psdf) -self.assert_eq(pser, psser) +# SPARK-38946: Since Pandas 1.4, eval with inplace generate a new dataframe +if LooseVersion(pd.__version__) >= LooseVersion("1.4"): +self.assert_eq(pser, psser) Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36353: [SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in df.eval/update/fillna/setitem
ueshin commented on code in PR #36353: URL: https://github.com/apache/spark/pull/36353#discussion_r886192259 ## python/pyspark/pandas/frame.py: ## @@ -8412,7 +8430,11 @@ def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True) *HIDDEN_COLUMNS, ) internal = self._internal.with_new_sdf(sdf, data_fields=data_fields) -self._update_internal_frame(internal, requires_same_anchor=False) +# Since Spark 3.4, df.update generates a new dataframe instead of operating +# in-place to follow pandas v1.4 behavior, see also SPARK-38946. +self._update_internal_frame( +internal, requires_same_anchor=False, anchor_force_disconnect=True Review Comment: We don't need to specify `requires_same_anchor` any more if `anchor_force_disconnect=True`? ## python/pyspark/pandas/frame.py: ## @@ -11944,7 +11966,11 @@ def eval_func(pdf): # type: ignore[no-untyped-def] if inplace: # Here, the result is always a frame because the error is thrown during schema inference # from pandas. -self._update_internal_frame(result._internal, requires_same_anchor=False) +# Since Spark 3.4, eval with inplace generates a new dataframe instead of operating +# in-place to follow pandas v1.4 behavior, see also SPARK-38946. +self._update_internal_frame( +result._internal, requires_same_anchor=False, anchor_force_disconnect=True Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wmoustafa commented on pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
wmoustafa commented on PR #36593: URL: https://github.com/apache/spark/pull/36593#issuecomment-1142712351 @beliefer, glad to see interest/progress in cross platform SQL/UDFs pushdown. Have you considered doing this leveraging frameworks such as Transport [[1](https://github.com/linkedin/transport), [2](https://engineering.linkedin.com/blog/2018/11/using-translatable-portable-UDFs)] for UDFs and Coral [[1](https://github.com/linkedin/coral), [2](https://engineering.linkedin.com/blog/2020/coral)] for SQL? With Transport, one can implement a function that is executable in Spark as well as other data sources, using one implementation. All function variants (automatically generated) will natively access the in-memory records of the corresponding engine/data source. With Coral, one can apply transformations/rewrites to built-in functions/SQL expressions so they translate to the same semantics in an underlying engine/data source. For example, it can be used to push down complex functions/SQL expressions from Spark to Trino despite having different syntax. This PR might not be the best place to discuss this in detail, but happy to file a JIRA ticket to carry this forward. cc: @xkrogen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] JoshRosen closed pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator
JoshRosen closed pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator URL: https://github.com/apache/spark/pull/36680 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator
sandeepvinayak commented on PR #36680: URL: https://github.com/apache/spark/pull/36680#issuecomment-1142682897 @JoshRosen Can you please review this when you get chance. Also, it will be great, if we can get this fix as part of next release. thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
otterc commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r886137214 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -536,9 +619,11 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } } // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results - // sent to the driver will be empty. This cam happen when the service didn't receive any + // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. + writeAppAttemptShuffleMergeInfo( Review Comment: We are writing this to db even before we actually finalize the partition, that is, close all the partition files? What happens when there are error during finalizing the files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 commented on PR #36601: URL: https://github.com/apache/spark/pull/36601#issuecomment-1142632317 I am not, that was an accident. Please check the diff on the other PR again. It was added accidentally, i removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
otterc commented on PR #36601: URL: https://github.com/apache/spark/pull/36601#issuecomment-1142631465 @akpatnam25 Can you check if you are adding .idea/vcs.xml? If yes, could you remove that change and re-open this since the review was done on this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 closed pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 closed pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true URL: https://github.com/apache/spark/pull/36601 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 commented on PR #36601: URL: https://github.com/apache/spark/pull/36601#issuecomment-1142627535 Closing this PR due to some build weirdness. This PR is now recreated in PR https://github.com/apache/spark/pull/36734 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 opened a new pull request, #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 opened a new pull request, #36734: URL: https://github.com/apache/spark/pull/36734 ### What changes were proposed in this pull request? Adds the corruption exception handling for merged shuffle chunk when spark.shuffle.detectCorrupt is set to true(default value is true) ### Why are the changes needed? Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, and this configuration is one of the knob for early corruption detection. So the fallback can be triggered as expected. After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to true by default, but the early corruption detect knob is controlled with a new configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false by default. Thus the default behavior, with only Magnet enabled after Spark 3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no fallback will be triggered. And it will drop to throw an exception when start to read the corrupted blocks. We handle the corrupted stream for merged blocks by throwing a FetchFailedException in this case. This will trigger a retry based on the values of spark.shuffle.detectCorrupt.useExtraMemory and spark.shuffle.detectCorrupt. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Tested on internal cluster - Added UT This is a PR to tackle some of the build weirdness found in PR 36601 (https://github.com/apache/spark/pull/36601). It contains the exact same diff. Closed that one out and recreated it here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
otterc commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r837698827 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -88,13 +103,28 @@ private static final ByteBuffer SUCCESS_RESPONSE = new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer(); + private static final ObjectMapper mapper = new ObjectMapper(); + + /** + * This a common prefix to the key for each app shuffle partition we stick in leveldb, so they Review Comment: Nit: "stick in" -> "add to" ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +744,156 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + try { +byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, attemptId)); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info", e); + } +} + } + + private void writeAppAttemptShuffleMergeInfo( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey( +new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, shuffleMergeId)); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition", e); + } +} + + } + + private T parseDbKey(String key, String prefix, Class valueType) throws IOException { +if (!key.startsWith(prefix + DB_KEY_DELIMITER)) { + throw new IllegalArgumentException("expected a string starting with " + prefix); +} +String json = key.substring(prefix.length() + 1); +return mapper.readValue(json, valueType); + } + + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key, + String prefix) throws IOException { +return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class); + } + + private byte[] getDbKey(Object key, String prefix) throws IOException { +// we stick a common prefix on all the keys so we can find them in the DB +String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); +return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + @VisibleForTesting + void reloadAppShuffleInfo(DB db) throws IOException { +logger.info("Reload applications merged shuffle information from DB"); +reloadActiveAppAttemptsPathInfo(db); +reloadFinalizedAppAttemptsShuffleMergeInfo(db); + } + + private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { +Map.Entry e = itr.next(); +String key = new String(e.getKey(), StandardCharsets.UTF_8); +if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; +} +AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); +try{ + AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), AppPathsInfo.class); + logger.info("Reloading active application {}_{} merged shuffle files paths", + appAttemptId.appId, appAttemptId.attemptId); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { +if (existingAppShuffleInfo == null || +existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + return new AppShuffleInfo( +
[GitHub] [spark] ueshin commented on a diff in pull request #36640: [SPARK-39262][PYTHON] Correct the behavior of creating DataFrame from an RDD
ueshin commented on code in PR #36640: URL: https://github.com/apache/spark/pull/36640#discussion_r886035398 ## python/pyspark/sql/session.py: ## @@ -611,8 +611,8 @@ def _inferSchema( :class:`pyspark.sql.types.StructType` """ first = rdd.first() -if not first: -raise ValueError("The first row in RDD is empty, " "can not infer schema") +if first is None: Review Comment: What about `""`, btw? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36640: [SPARK-39262][PYTHON] Correct the behavior of creating DataFrame from an RDD
ueshin commented on code in PR #36640: URL: https://github.com/apache/spark/pull/36640#discussion_r886035398 ## python/pyspark/sql/session.py: ## @@ -611,8 +611,8 @@ def _inferSchema( :class:`pyspark.sql.types.StructType` """ first = rdd.first() -if not first: -raise ValueError("The first row in RDD is empty, " "can not infer schema") +if first is None: Review Comment: How about `""`, btw? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36640: [SPARK-39262][PYTHON] Correct the behavior of creating DataFrame from an RDD
ueshin commented on code in PR #36640: URL: https://github.com/apache/spark/pull/36640#discussion_r885963200 ## python/pyspark/sql/session.py: ## @@ -611,8 +611,8 @@ def _inferSchema( :class:`pyspark.sql.types.StructType` """ first = rdd.first() -if not first: -raise ValueError("The first row in RDD is empty, " "can not infer schema") +if first is None: Review Comment: The `first` can be `None`. ```py >>> print(sc.parallelize([None, 2, 3, 4]).first()) None ``` We should also check if `first is None` because `None` is not `Sized`. ```py >>> isinstance(None, Sized) False ``` Also we need a test to check the first row is `None` and it fails. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
MaxGekk commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885955542 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -666,10 +669,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { -// The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) - }) + expectOffsetChange() Review Comment: @cloud-fan @srielau Do you agree that we wrap `IllegalStateException` by `INTERNAL_ERROR` coming from connectors (like Kafka connector). Precisely speaking, this is not Spark's internal error, and user's should reports bugs to the connector devs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
MaxGekk commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885953095 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e + case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => Review Comment: added handling of `NullPointerException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36640: [SPARK-39262][PYTHON] Correct the behavior of creating DataFrame from an RDD
ueshin commented on code in PR #36640: URL: https://github.com/apache/spark/pull/36640#discussion_r885944686 ## python/pyspark/sql/session.py: ## @@ -611,8 +611,8 @@ def _inferSchema( :class:`pyspark.sql.types.StructType` """ first = rdd.first() -if not first: -raise ValueError("The first row in RDD is empty, " "can not infer schema") +if first is None: Review Comment: If `rdd.first()` uses `if rs:` check, `first` will never be `[]`, `{}`, `0` or `""`, either? I guess we should fix `rdd.first()` to properly return the first row, and `if first is None:` check here is vaild according to the error message `"The first row in RDD is empty, can not infer schema"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #36640: [SPARK-39262][PYTHON] Correct the behavior of creating DataFrame from an RDD
ueshin commented on code in PR #36640: URL: https://github.com/apache/spark/pull/36640#discussion_r885944686 ## python/pyspark/sql/session.py: ## @@ -611,8 +611,8 @@ def _inferSchema( :class:`pyspark.sql.types.StructType` """ first = rdd.first() -if not first: -raise ValueError("The first row in RDD is empty, " "can not infer schema") +if first is None: Review Comment: If `rdd.first()` uses `if rs:` check, `first` will never be `[]`, `{}`, `0` or `""`, either? I guess we should fix `rdd.first()` to properly return the first row, and `if first is None:` check here is vaild according to the error message `"The first row in RDD is empty, can not infer schema"`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
MaxGekk commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885944596 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { classOf[KafkaSourceProvider].getCanonicalName) } + override def expectOffsetChange(): ExpectFailure[_] = { +ExpectFailure[IllegalStateException](e => { Review Comment: The top level method which the exception comes from is `runStream()`: https://github.com/apache/spark/blob/ea215279b0a4785d48723f5f24c96b8d7d9aa355/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L267 ```java "stream execution thread for [id = 382fb843-79a4-42c0-be4c-4c4cb40e46a8, runId = 0d8b1a41-c8be-4337-99e8-f0f557c0acc1]@17862" daemon prio=5 tid=0xda nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.spark.sql.kafka010.KafkaSource.reportDataLoss(KafkaSource.scala:346) at org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2(KafkaSource.scala:314) at org.apache.spark.sql.kafka010.KafkaSource.$anonfun$getBatch$2$adapted(KafkaSource.scala:314) at org.apache.spark.sql.kafka010.KafkaSource$$Lambda$3305.201433992.apply(Unknown Source:-1) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$getOffsetRangesFromResolvedOffsets$6(KafkaOffsetReaderConsumer.scala:535) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer$$Lambda$3311.430021093.apply(Unknown Source:-1) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.TraversableLike$$Lambda$67.1881129850.apply(Unknown Source:-1) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.getOffsetRangesFromResolvedOffsets(KafkaOffsetReaderConsumer.scala:530) at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:314) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:548) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3300.1940457280.apply(Unknown Source:-1) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293) at scala.collection.TraversableLike$$Lambda$272.176683244.apply(Unknown Source:-1) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27) at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293) at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290) at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3299.1391090515.apply(Unknown Source:-1) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:384) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:382) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:255) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$3021.1812599301.apply$mcV$sp(Unknown Source:-1) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
[GitHub] [spark] MaxGekk commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
MaxGekk commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885936568 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e Review Comment: We needed when we had `SparkIllegalStateException` which extends `IllegalStateException`. After we removed it, we don't need to handle `SparkThrowable` somehow especially. I will remove this line. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to true
akpatnam25 commented on PR #36601: URL: https://github.com/apache/spark/pull/36601#issuecomment-1142403811 @HyukjinKwon do you know why this PR build is failing? The build is failing in code that I did not touch and seems to be working for other contributors. I have already merged in the latest master into this branch and not sure why it continues to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36689: [SPARK-39306][SQL] Support scalar subquery in time travel
dongjoon-hyun commented on PR #36689: URL: https://github.com/apache/spark/pull/36689#issuecomment-1142334174 Thank you, @cloud-fan and all! +1, LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36730: [SPARK-39342][SQL] ShowTablePropertiesCommand/ShowTablePropertiesExec should redact properties.
cloud-fan closed pull request #36730: [SPARK-39342][SQL] ShowTablePropertiesCommand/ShowTablePropertiesExec should redact properties. URL: https://github.com/apache/spark/pull/36730 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36730: [SPARK-39342][SQL] ShowTablePropertiesCommand/ShowTablePropertiesExec should redact properties.
cloud-fan commented on PR #36730: URL: https://github.com/apache/spark/pull/36730#issuecomment-1142300520 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36731: [SPARK-39343][SQL] DescribeTableExec should redact properties
cloud-fan closed pull request #36731: [SPARK-39343][SQL] DescribeTableExec should redact properties URL: https://github.com/apache/spark/pull/36731 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36731: [SPARK-39343][SQL] DescribeTableExec should redact properties
cloud-fan commented on PR #36731: URL: https://github.com/apache/spark/pull/36731#issuecomment-1142298692 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885787936 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -244,8 +314,7 @@ abstract class BinaryArithmetic extends BinaryOperator with NullIntolerant override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match { case _: DecimalType => - // Overflow is handled in the CheckOverflow operator - defineCodeGen(ctx, ev, (eval1, eval2) => s"$eval1.$decimalMethod($eval2)") + throw QueryExecutionErrors.cannotEvalDecimalTypeError() Review Comment: We do not need to put the error in `QueryExecutionErrors` if it means a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885787249 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala: ## @@ -232,3 +216,36 @@ case class CheckOverflowInSum( override protected def withNewChildInternal(newChild: Expression): CheckOverflowInSum = copy(child = newChild) } + +/** + * An add expression which is only used internally by Sum/Avg. + * + * Nota that, this expression does not check overflow which is different with `Add`. + */ +case class DecimalAddNoOverflowCheck( +left: Expression, +right: Expression, +override val dataType: DataType, +failOnError: Boolean = SQLConf.get.ansiEnabled) + extends BinaryArithmetic with DecimalArithmeticSupport { Review Comment: why do we need to extend `DecimalArithmeticSupport` here? It doesn't seem like we can reuse many code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885784544 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -778,16 +1002,24 @@ case class Pmod( val javaType = CodeGenerator.javaType(dataType) lazy val errorContext = ctx.addReferenceObj("errCtx", queryContext) val result = dataType match { - case DecimalType.Fixed(_, _) => + case DecimalType.Fixed(precision, scale) => +val errorContextCode = if (nullOnOverflow) { + "\"\"" +} else { + errorContext +} val decimalAdd = "$plus" s""" - $javaType $remainder = ${eval1.value}.remainder(${eval2.value}); - if ($remainder.compare(new org.apache.spark.sql.types.Decimal().set(0)) < 0) { - ${ev.value}=($remainder.$decimalAdd(${eval2.value})).remainder(${eval2.value}); - } else { -${ev.value}=$remainder; - } -""" + |$javaType $remainder = ${eval1.value}.remainder(${eval2.value}); + |if ($remainder.compare(new org.apache.spark.sql.types.Decimal().set(0)) < 0) { + | ${ev.value}=($remainder.$decimalAdd(${eval2.value})).remainder(${eval2.value}); + |} else { + | ${ev.value}=$remainder; + |} + |${ev.value} = ${ev.value}.toPrecision( + | $precision, $scale, Decimal.ROUND_HALF_UP(), $nullOnOverflow, $errorContextCode); Review Comment: do we need to update `ev.isNull`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885782952 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -778,16 +1002,24 @@ case class Pmod( val javaType = CodeGenerator.javaType(dataType) lazy val errorContext = ctx.addReferenceObj("errCtx", queryContext) val result = dataType match { - case DecimalType.Fixed(_, _) => + case DecimalType.Fixed(precision, scale) => +val errorContextCode = if (nullOnOverflow) { + "\"\"" +} else { + errorContext +} val decimalAdd = "$plus" s""" - $javaType $remainder = ${eval1.value}.remainder(${eval2.value}); - if ($remainder.compare(new org.apache.spark.sql.types.Decimal().set(0)) < 0) { - ${ev.value}=($remainder.$decimalAdd(${eval2.value})).remainder(${eval2.value}); - } else { -${ev.value}=$remainder; - } -""" + |$javaType $remainder = ${eval1.value}.remainder(${eval2.value}); Review Comment: ```suggestion |$javaType $remainder = ${eval1.value}.$decimalMethod(${eval2.value}); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885779230 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -490,12 +622,27 @@ trait DivModLike extends BinaryArithmetic { s"${eval2.value} == 0" } val javaType = CodeGenerator.javaType(dataType) +val errorContext = if (nullOnOverflow) { + "\"\"" +} else { + ctx.addReferenceObj("errCtx", queryContext) +} val operation = if (operandsDataType.isInstanceOf[DecimalType]) { - decimalToDataTypeCodeGen(s"${eval1.value}.$decimalMethod(${eval2.value})") + val decimal = super.dataType.asInstanceOf[DecimalType] Review Comment: We don't need this. The code can be `Decimal $decimalValue = ...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885776567 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -208,6 +210,79 @@ case class Abs(child: Expression, failOnError: Boolean = SQLConf.get.ansiEnabled override protected def withNewChildInternal(newChild: Expression): Abs = copy(child = newChild) } +/** + * The child class should override decimalType method to report the result data type. + * + * When `spark.sql.decimalOperations.allowPrecisionLoss` is set to true, if the precision / scale + * needed are out of the range of available values, the scale is reduced up to 6, in order to + * prevent the truncation of the integer part of the decimals. + * + * Rounds the decimal to given scale and check whether the decimal can fit in provided precision + * or not. If not, if `nullOnOverflow` is `true`, it returns `null`; otherwise an + * `ArithmeticException` is thrown. + */ +trait DecimalArithmeticSupport extends BinaryArithmetic { + protected val nullOnOverflow: Boolean = !failOnError + protected val allowPrecisionLoss: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss + + override def checkInputDataTypes(): TypeCheckResult = (left.dataType, right.dataType) match { +case (_: DecimalType, _: DecimalType) => + // We allow eval decimal type with different precision and scale, and change the precision + // and scale before return result. + TypeCheckResult.TypeCheckSuccess +case _ => super.checkInputDataTypes() + } + + /** Name of the function for this expression on a [[Decimal]] type. */ + protected def decimalMethod: String + protected def decimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType + + override def nullable: Boolean = dataType match { +case _: DecimalType => nullOnOverflow +case _ => super.nullable + } + + override def dataType: DataType = (left, right) match { +case (DecimalType.Expression(p1, s1), DecimalType.Expression(p2, s2)) => + decimalType(p1, s1, p2, s2) +case _ => super.dataType + } + + def checkOverflow(value: Decimal, decimalType: DecimalType): Decimal = { +value.toPrecision( + decimalType.precision, + decimalType.scale, + Decimal.ROUND_HALF_UP, + nullOnOverflow, + queryContext) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = dataType match { +case decimalType: DecimalType => + val errorContextCode = if (nullOnOverflow) { +"\"\"" + } else { +ctx.addReferenceObj("errCtx", queryContext) + } + val isNull = if (nullOnOverflow) { Review Comment: ```suggestion val updateisNull = if (nullOnOverflow) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885774715 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -208,6 +210,79 @@ case class Abs(child: Expression, failOnError: Boolean = SQLConf.get.ansiEnabled override protected def withNewChildInternal(newChild: Expression): Abs = copy(child = newChild) } +/** + * The child class should override decimalType method to report the result data type. + * + * When `spark.sql.decimalOperations.allowPrecisionLoss` is set to true, if the precision / scale + * needed are out of the range of available values, the scale is reduced up to 6, in order to + * prevent the truncation of the integer part of the decimals. + * + * Rounds the decimal to given scale and check whether the decimal can fit in provided precision + * or not. If not, if `nullOnOverflow` is `true`, it returns `null`; otherwise an + * `ArithmeticException` is thrown. + */ +trait DecimalArithmeticSupport extends BinaryArithmetic { + protected val nullOnOverflow: Boolean = !failOnError + protected val allowPrecisionLoss: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss + + override def checkInputDataTypes(): TypeCheckResult = (left.dataType, right.dataType) match { +case (_: DecimalType, _: DecimalType) => + // We allow eval decimal type with different precision and scale, and change the precision + // and scale before return result. + TypeCheckResult.TypeCheckSuccess +case _ => super.checkInputDataTypes() + } + + /** Name of the function for this expression on a [[Decimal]] type. */ + protected def decimalMethod: String + protected def decimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType + + override def nullable: Boolean = dataType match { +case _: DecimalType => nullOnOverflow Review Comment: should it be `super.nullable || nullOnOverflow`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
ivoson commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r885759220 ## core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala: ## @@ -43,8 +43,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") return UIUtils.basicSparkPage(request, msg, "Not Found") } -val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "Resources", - "State", "Logs") +val executorHeaders = Seq("ExecutorID", "Worker", "Resource Profile Id", "Cores", "Memory", + "Resources", "State", "Logs") Review Comment: Sure, of course. Will make the change. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
ivoson commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r885758451 ## core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.resource.ResourceRequirement + +/** + * Describe resource requests for different resource profiles. Used for executor schedule. + * + * @param coresPerExecutor cores for each executor. + * @param memoryMbPerExecutor memory for each executor. + * @param customResourcesPerExecutor custom resource requests for each executor. + */ +private[spark] case class ResourceDescription( Review Comment: Thanks for the advise. `ExecutorResourceDescription ` sounds better. The reason I introduce this class it that, standalone cluster schedule based on coresPerExecutor, memoryMbPerExecutor, and also customResources. The main difference is that `customResourcesPerExecutor` is described with type Seq of [ResourceRequirement](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala#L116) here which is more friendly for schedule in master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
ivoson commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r885749761 ## core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala: ## @@ -65,7 +66,70 @@ private[spark] class ApplicationInfo( appSource = new ApplicationSource(this) nextExecutorId = 0 removedExecutors = new ArrayBuffer[ExecutorDesc] -executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE) +val initialExecutorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE) + +rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]() +rpIdToResourceProfile(DEFAULT_RESOURCE_PROFILE_ID) = desc.defaultProfile +rpIdToResourceDesc = new mutable.HashMap[Int, ResourceDescription]() +createResourceDescForResourceProfile(desc.defaultProfile) + +targetNumExecutorsPerResourceProfileId = new mutable.HashMap[Int, Int]() +targetNumExecutorsPerResourceProfileId(DEFAULT_RESOURCE_PROFILE_ID) = initialExecutorLimit Review Comment: Not sure if I get this right. Please also help check this. In [ApplicationInfo](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala#L49) we can see that the variable `initialExecutorLimit` is used for dynamic allocation to limit the executor num for the app to start with. And when [dynamic allocation is enabled](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala#L120), the limit would be 0, otherwise it would be None for now. And the `ApplicationDescription` is created during `SparkContext` initialization. At this time point, we can only have default resource profile. For other resource profiles, we rely on [`ExecutorAllocationManager`](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala#L431) to update the requests for different resource profiles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36665: [SPARK-39287][CORE] TaskSchedulerImpl should quickly ignore task finished event if its task was finished state.
Ngone51 commented on code in PR #36665: URL: https://github.com/apache/spark/pull/36665#discussion_r885746852 ## core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala: ## @@ -102,6 +102,10 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul (deserializedResult, size) } + // quickly return if the task has finished + if (scheduler.isFinishedTask(taskSetManager, tid)) { +return Review Comment: This's only useful when you hit the race condition, which is only a corner case. And in SPARK-37300, I think our target is to fix the bug issue but not for improvement. I personally think this introduces more complexity compared to the benefit we could get. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
ivoson commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r885722489 ## core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala: ## @@ -25,10 +25,13 @@ package org.apache.spark.deploy private[deploy] class ExecutorDescription( val appId: String, val execId: Int, +val rpId: Int, val cores: Int, +val memoryMb: Int, Review Comment: We will need the memoryMb here when [master failover.](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L351) Master will reconstruct the app's executor information and worker's resource usage information. ``` for (exec <- validExecutors) { val (execDesc, execResources) = (exec.desc, exec.resources) val app = idToApp(execDesc.appId) val execInfo = app.addExecutor( worker, execDesc.cores, execResources, Some(execDesc.execId)) worker.addExecutor(execInfo) worker.recoverResources(execResources) execInfo.copyState(execDesc) } ``` We need to know the memoryMb of the executor while creating [`ExecutorDesc`](https://github.com/apache/spark/blob/v3.3.0-rc3/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala#L88). And previously, we can get the memoryMb information from `ApplicaionDescription` which could be recovered from state-store for recovery. But currently, we may have multiple resource profiles, and we don't have the resource profiles information written in the state-store, so we need worker to record the memory information. And in master, we can only reconstruct the resource profile information in `ApplicationInfo` after client send resource request `RequestExecutors` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on pull request #36447: [SPARK-38807][CORE] Fix the startup error of spark shell on Windows S…
Ngone51 commented on PR #36447: URL: https://github.com/apache/spark/pull/36447#issuecomment-1142218019 cc @pingsutw @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36727: [SPARK-39340][SQL][3.2] DS v2 agg pushdown should allow dots in the name of top-level columns
cloud-fan closed pull request #36727: [SPARK-39340][SQL][3.2] DS v2 agg pushdown should allow dots in the name of top-level columns URL: https://github.com/apache/spark/pull/36727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36727: [SPARK-39340][SQL][3.2] DS v2 agg pushdown should allow dots in the name of top-level columns
cloud-fan commented on PR #36727: URL: https://github.com/apache/spark/pull/36727#issuecomment-1142194033 The GA job says `org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite` failed, but I can't reproduce it locally and this PR definitely won't affect TPCDS queries. The GA job also says pyspark test failed, which is surely unrelated to this PR. I'm merging it to 3.2, thanks for review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #33457: [SPARK-36237][UI][SQL] Attach and start handler after application started in UI
AngersZh commented on PR #33457: URL: https://github.com/apache/spark/pull/33457#issuecomment-1142188571 @gongzh021 Maybe you can check this commit https://github.com/apache/spark/pull/33457/commits/dba26cd5bd1aaacb01e08cfcfef9f02ffe96d018 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #36732: [SPARK-39345][CORE][SQL][DSTREAM][ML][MESOS][SS] Replace `filter(!condition)` with `filterNot(condition)`
LuciferYang commented on PR #36732: URL: https://github.com/apache/spark/pull/36732#issuecomment-1142188003 cc @wangyum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gongzh021 commented on pull request #33457: [SPARK-36237][UI][SQL] Attach and start handler after application started in UI
gongzh021 commented on PR #33457: URL: https://github.com/apache/spark/pull/33457#issuecomment-1142177922 > @AngersZh Before this change, the status code `500` is returned and helpful error message is shown if we access to `/jobs` before the UI is prepared. ![SPARK-36237-500](https://user-images.githubusercontent.com/4736016/126939072-522f0fbf-cb12-467a-b56c-84cdb42d1ff8.png) > > But after this change, the status code `404` is returned and no helpful error message is shown. ![SPARK-36237-404](https://user-images.githubusercontent.com/4736016/126939133-65927264-348f-4587-847f-563897d95c6f.png) > > It might be confusable for users. I have the same bug. Excuse me, how did you solve it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36689: [SPARK-39306][SQL] Support scalar subquery in time travel
cloud-fan closed pull request #36689: [SPARK-39306][SQL] Support scalar subquery in time travel URL: https://github.com/apache/spark/pull/36689 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36689: [SPARK-39306][SQL] Support scalar subquery in time travel
cloud-fan commented on PR #36689: URL: https://github.com/apache/spark/pull/36689#issuecomment-1142175473 thanks for the review, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885677872 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e + case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) => Review Comment: shall we catch a bit more exceptions? I have one in my mind: `NullPointerException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885675714 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -1376,6 +1376,13 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { classOf[KafkaSourceProvider].getCanonicalName) } + override def expectOffsetChange(): ExpectFailure[_] = { +ExpectFailure[IllegalStateException](e => { Review Comment: can we look into the stacktrace and see if we can catch the internal error for streaming v1 as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885674491 ## sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala: ## @@ -486,4 +489,22 @@ object QueryExecution { val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true) prepareForExecution(preparationRules, sparkPlan.clone()) } + + /** + * Catches asserts and illegal state exceptions, and converts them to internal errors. + */ + private[sql] def withInternalError[T](msg: String)(block: => T): T = { +try { + block +} catch { + case e: SparkThrowable => throw e Review Comment: do we need this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36704: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
cloud-fan commented on code in PR #36704: URL: https://github.com/apache/spark/pull/36704#discussion_r885671871 ## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ## @@ -1401,6 +1408,14 @@ class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBase { class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { + override def expectOffsetChange(): ExpectFailure[_] = { +ExpectFailure[SparkException](e => { Review Comment: ```suggestion ExpectFailure[SparkThrowable](e => { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r885645567 ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -853,8 +857,11 @@ private[spark] class TaskSchedulerImpl( // (taskId, stageId, stageAttemptId, accumUpdates) val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => -val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => + val (accInfos, taskProgressRate) = getTaskAccumulableInfosAndProgressRate(updates) Review Comment: I'm a bit worried about the scheduler's throughput if our concerns on the accumulators' traverse efficiency matter. I still think we could only traverse inside the speculation thread to decouple with the scheduling thread. If we move this stuff to the speculation thread, we can also avoid unnecessary traverses since it's only necessary when `checkSpeculatableTasks` requires, while with the current implementation it traverses for each heartbeat update and successful task completion. If we want to move it to the speculation thread, the implementation could be also a bit simpler. At `TaskSchedulerImpl.executorHeartbeatReceived()`, we should only set `_accumulables`. And we don't need to set `_accumulables` by us, which is already covered by `DAGScheudler.updateAccumulators()`. Then, we'd only need to focus on the calculation/traverses at `InefficientTaskCalculator`. It might be a bit slow for the first-time traverses but we can cache the records/runtime for the finished tasks or progress rate for the running tasks. And even if it's slow, I think it's still better compared to slow the scheduling threads. @weixiuli @mridulm WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] 1104056452 commented on pull request #36447: [SPARK-38807][CORE] Fix the startup error of spark shell on Windows S…
1104056452 commented on PR #36447: URL: https://github.com/apache/spark/pull/36447#issuecomment-1142132713 cc @Ngone51 @jiangxb1987 @xuanyuanking, could you please help review this PR? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
cloud-fan commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885616172 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -490,10 +621,26 @@ trait DivModLike extends BinaryArithmetic { s"${eval2.value} == 0" } val javaType = CodeGenerator.javaType(dataType) -val operation = if (operandsDataType.isInstanceOf[DecimalType]) { - decimalToDataTypeCodeGen(s"${eval1.value}.$decimalMethod(${eval2.value})") +val checkOverflow = if (operandsDataType.isInstanceOf[DecimalType]) { + val decimal = super.dataType.asInstanceOf[DecimalType] + val errorContextCode = if (nullOnOverflow) { +"\"\"" + } else { +ctx.addReferenceObj("errCtx", queryContext) + } + val decimalValue = ctx.freshName("decimalValue") + // scalastyle:off line.size.limit + s""" + |${CodeGenerator.javaType(decimal)} $decimalValue = ${eval1.value}.$decimalMethod(${eval2.value}).toPrecision( Review Comment: Ah I see -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36716: [SPARK-39062][CORE] Add stage level resource scheduling support for standalone cluster
Ngone51 commented on code in PR #36716: URL: https://github.com/apache/spark/pull/36716#discussion_r885557272 ## core/src/main/scala/org/apache/spark/deploy/master/ResourceDescription.scala: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import org.apache.spark.resource.ResourceRequirement + +/** + * Describe resource requests for different resource profiles. Used for executor schedule. + * + * @param coresPerExecutor cores for each executor. + * @param memoryMbPerExecutor memory for each executor. + * @param customResourcesPerExecutor custom resource requests for each executor. + */ +private[spark] case class ResourceDescription( Review Comment: Shall we reuse `ExecutorResourcesOrDefaults` to replace `ResourceDescription`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss
manuzhang commented on PR #36615: URL: https://github.com/apache/spark/pull/36615#issuecomment-1142042879 Superseded by https://github.com/apache/spark/pull/36698 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang closed pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss
manuzhang closed pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss URL: https://github.com/apache/spark/pull/36615 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
ulysses-you commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885533013 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -208,6 +210,78 @@ case class Abs(child: Expression, failOnError: Boolean = SQLConf.get.ansiEnabled override protected def withNewChildInternal(newChild: Expression): Abs = copy(child = newChild) } +/** + * The child class should override decimalType method to report the result data type. + * + * When `spark.sql.decimalOperations.allowPrecisionLoss` is set to true, if the precision / scale + * needed are out of the range of available values, the scale is reduced up to 6, in order to + * prevent the truncation of the integer part of the decimals. + * + * Rounds the decimal to given scale and check whether the decimal can fit in provided precision + * or not. If not, if `nullOnOverflow` is `true`, it returns `null`; otherwise an + * `ArithmeticException` is thrown. + */ +trait DecimalArithmeticSupport extends BinaryArithmetic { + protected val nullOnOverflow: Boolean = !failOnError + protected val allowPrecisionLoss: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss + + override def checkInputDataTypes(): TypeCheckResult = (left.dataType, right.dataType) match { +case (_: DecimalType, _: DecimalType) => + // We allow eval decimal type with different precision and scale, and change the precision + // and scale before return result. + TypeCheckResult.TypeCheckSuccess +case _ => super.checkInputDataTypes() + } + + /** Name of the function for this expression on a [[Decimal]] type. */ + protected def decimalMethod: String = +throw QueryExecutionErrors.notOverrideExpectedMethodsError("DecimalArithmetic", + "decimalMethod", "genCode") + protected def decimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = +throw QueryExecutionErrors.notOverrideExpectedMethodsError("DecimalArithmetic", + "decimalType", "dataType") + + override def nullable: Boolean = dataType match { +case _: DecimalType => true Review Comment: after some thought, we can not simply use `nullOnOverflow` here. If we want to use `nullOnOverflow`, the code-gen should be: ``` if (nullOnOverflow) { ${ev.isNull} = ${ev.value} == null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
ulysses-you commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885529640 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -208,6 +210,78 @@ case class Abs(child: Expression, failOnError: Boolean = SQLConf.get.ansiEnabled override protected def withNewChildInternal(newChild: Expression): Abs = copy(child = newChild) } +/** + * The child class should override decimalType method to report the result data type. + * + * When `spark.sql.decimalOperations.allowPrecisionLoss` is set to true, if the precision / scale + * needed are out of the range of available values, the scale is reduced up to 6, in order to + * prevent the truncation of the integer part of the decimals. + * + * Rounds the decimal to given scale and check whether the decimal can fit in provided precision + * or not. If not, if `nullOnOverflow` is `true`, it returns `null`; otherwise an + * `ArithmeticException` is thrown. + */ +trait DecimalArithmeticSupport extends BinaryArithmetic { + protected val nullOnOverflow: Boolean = !failOnError + protected val allowPrecisionLoss: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss + + override def checkInputDataTypes(): TypeCheckResult = (left.dataType, right.dataType) match { +case (_: DecimalType, _: DecimalType) => + // We allow eval decimal type with different precision and scale, and change the precision + // and scale before return result. + TypeCheckResult.TypeCheckSuccess +case _ => super.checkInputDataTypes() + } + + /** Name of the function for this expression on a [[Decimal]] type. */ + protected def decimalMethod: String = +throw QueryExecutionErrors.notOverrideExpectedMethodsError("DecimalArithmetic", + "decimalMethod", "genCode") + protected def decimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = +throw QueryExecutionErrors.notOverrideExpectedMethodsError("DecimalArithmetic", Review Comment: removed the default implementation for `decimalType` and `decimalMethod` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] codecov-commenter commented on pull request #36726: [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source
codecov-commenter commented on PR #36726: URL: https://github.com/apache/spark/pull/36726#issuecomment-1141997915 # [Codecov](https://codecov.io/gh/apache/spark/pull/36726?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report > Merging [#36726](https://codecov.io/gh/apache/spark/pull/36726?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8bbbdb5) into [master](https://codecov.io/gh/apache/spark/commit/f45cdda2cd55012fab85a17baec71f5ab637c400?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f45cdda) will **decrease** coverage by `10.40%`. > The diff coverage is `95.00%`. > :exclamation: Current head 8bbbdb5 differs from pull request most recent head 3cd7eb4. Consider uploading reports for the commit 3cd7eb4 to get more accurate results ```diff @@ Coverage Diff @@ ## master #36726 +/- ## === - Coverage 86.89% 76.49% -10.41% === Files 259 217 -42 Lines 5774351963 -5780 Branches 9156 8543 -613 === - Hits5017739748-10429 - Misses 627711159 +4882 + Partials 1289 1056 -233 ``` | Flag | Coverage Δ | | |---|---|---| | unittests | `76.47% <95.00%> (-10.40%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more. | [Impacted Files](https://codecov.io/gh/apache/spark/pull/36726?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | | |---|---|---| | [python/pyspark/pandas/window.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3dpbmRvdy5weQ==) | `88.88% <ø> (-0.07%)` | :arrow_down: | | [python/pyspark/sql/session.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3Nlc3Npb24ucHk=) | `60.62% <33.33%> (-7.56%)` | :arrow_down: | | [python/pyspark/pandas/groupby.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2dyb3VwYnkucHk=) | `95.39% <93.93%> (-0.17%)` | :arrow_down: | | [python/pyspark/pandas/generic.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2dlbmVyaWMucHk=) | `87.94% <100.00%> (-0.07%)` | :arrow_down: | | [python/pyspark/pandas/series.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Nlcmllcy5weQ==) | `94.81% <100.00%> (+0.02%)` | :arrow_up: | | [python/pyspark/pandas/spark/functions.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3NwYXJrL2Z1bmN0aW9ucy5weQ==) | `95.12% <100.00%> (+0.83%)` | :arrow_up: | | [python/pyspark/pandas/tests/test\_groupby.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3RfZ3JvdXBieS5weQ==) | `95.30% <100.00%> (+0.06%)` | :arrow_up: | | [python/pyspark/pandas/tests/test\_series.py](https://codecov.io/gh/apache/spark/pull/36726/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Rlc3RzL3Rlc3Rfc2VyaWVzLnB5) | `96.36% <100.00%> (+0.01%)` | :arrow_up: | | [python/pyspark/join.py]
[GitHub] [spark] ulysses-you commented on a diff in pull request #36698: [SPARK-39316][SQL] Merge PromotePrecision and CheckOverflow into decimal binary arithmetic
ulysses-you commented on code in PR #36698: URL: https://github.com/apache/spark/pull/36698#discussion_r885462208 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala: ## @@ -490,10 +621,26 @@ trait DivModLike extends BinaryArithmetic { s"${eval2.value} == 0" } val javaType = CodeGenerator.javaType(dataType) -val operation = if (operandsDataType.isInstanceOf[DecimalType]) { - decimalToDataTypeCodeGen(s"${eval1.value}.$decimalMethod(${eval2.value})") +val checkOverflow = if (operandsDataType.isInstanceOf[DecimalType]) { Review Comment: changed back -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org