[GitHub] [spark] beliefer commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
beliefer commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877803218 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCCatalog.scala: ## @@ -32,11 +35,14 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging { +class JDBCCatalog extends TableCatalog with SupportsNamespaces with FunctionCatalog with Logging { private var catalogName: String = null private var options: JDBCOptions = _ private var dialect: JdbcDialect = _ + private val functions: util.Map[Identifier, UnboundFunction] = +new ConcurrentHashMap[Identifier, UnboundFunction]() Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #36588: [SPARK-39217][SQL] Makes DPP support the pruning side has Union
wangyum commented on PR #36588: URL: https://github.com/apache/spark/pull/36588#issuecomment-1132514681 A case from production: ![image](https://user-images.githubusercontent.com/5399861/169463931-65bfd0c0-1759-4f9d-8a0a-66b32463b76a.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope
cloud-fan commented on code in PR #36608: URL: https://github.com/apache/spark/pull/36608#discussion_r877763059 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala: ## @@ -34,7 +34,7 @@ abstract class Covariance(val left: Expression, val right: Expression, nullOnDiv override def dataType: DataType = DoubleType override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType) - protected val n = AttributeReference("n", DoubleType, nullable = false)() + protected val count = AttributeReference("count", DoubleType, nullable = false)() Review Comment: shall we make it `protected[sql]` so that we can access it directly in the new expression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877754283 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { -val tables = sessionCatalog.listTables(dbName).map(makeTable) -CatalogImpl.makeDataset(tables, sparkSession) +if (sessionCatalog.databaseExists(dbName)) { + val tables = sessionCatalog.listTables(dbName).map(makeTable) + CatalogImpl.makeDataset(tables, sparkSession) +} else { + val multiParts = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + val plan = ShowTables(UnresolvedNamespace(multiParts), None) + val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect() + val tables = ret +.map(row => TableIdentifier(row.getString(1), Some(row.getString(0 +.map(makeTable) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #36616: [WIP][SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`
LuciferYang commented on PR #36616: URL: https://github.com/apache/spark/pull/36616#issuecomment-1132495258 This pr mainly focuses on `Parquet`. If this is acceptable, I will change Orc in another pr -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope
cloud-fan commented on code in PR #36608: URL: https://github.com/apache/spark/pull/36608#discussion_r877745569 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala: ## @@ -69,7 +69,7 @@ abstract class Covariance(val left: Expression, val right: Expression, nullOnDiv } protected def updateExpressionsDef: Seq[Expression] = { -val newN = n + 1.0 +val newN = count + 1.0 Review Comment: ```suggestion val newCount = count + 1.0 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
cloud-fan commented on code in PR #36614: URL: https://github.com/apache/spark/pull/36614#discussion_r877742782 ## docs/sql-ref-ansi-compliance.md: ## @@ -28,10 +28,10 @@ The casting behaviours are defined as store assignment rules in the standard. When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies with the ANSI store assignment rules. This is a separate configuration because its default value is `ANSI`, while the configuration `spark.sql.ansi.enabled` is disabled by default. -|Property Name|Default|Meaning|Since Version| -|-|---|---|-| -|`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.|3.0.0| -|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, Spark allows the type coercion as long as it is a valid Cast, which is very loose. e.g. converting string to int or double to boolean is allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. converting double to int or decimal to double is not allowed.|3.0.0| +|Property Name|Default| Meaning |Since Version| +|-|---|---|-| +|`spark.sql.ansi.enabled`|false| When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will use different type coercion rules for resolving conflicts among data types. The rules are consistently based on data type precedence. |3.0.0| +|`spark.sql.storeAssignmentPolicy`|ANSI| When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, S
[GitHub] [spark] cloud-fan commented on a diff in pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
cloud-fan commented on code in PR #36614: URL: https://github.com/apache/spark/pull/36614#discussion_r877742454 ## docs/sql-ref-ansi-compliance.md: ## @@ -28,10 +28,10 @@ The casting behaviours are defined as store assignment rules in the standard. When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies with the ANSI store assignment rules. This is a separate configuration because its default value is `ANSI`, while the configuration `spark.sql.ansi.enabled` is disabled by default. -|Property Name|Default|Meaning|Since Version| -|-|---|---|-| -|`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will forbid using the reserved keywords of ANSI SQL as identifiers in the SQL parser.|3.0.0| -|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value into a column with different data type, Spark will perform type conversion. Currently, we support 3 policies for the type coercion rules: ANSI, legacy and strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the behavior is mostly the same as PostgreSQL. It disallows certain unreasonable type conversions such as converting string to int or double to boolean. With legacy policy, Spark allows the type coercion as long as it is a valid Cast, which is very loose. e.g. converting string to int or double to boolean is allowed. It is also the only behavior in Spark 2.x and it is compatible with Hive. With strict policy, Spark doesn't allow any possible precision loss or data truncation in type coercion, e.g. converting double to int or decimal to double is not allowed.|3.0.0| +|Property Name|Default| Meaning |Since Version| +|-|---|---|-| +|`spark.sql.ansi.enabled`|false| When true, Spark tries to conform to the ANSI SQL specification: 1. Spark will throw a runtime exception if an overflow occurs in any operation on integral/decimal field. 2. Spark will use different type coercion rules for resolving conflicts among data types. The rules are consistently based on data type precedence. |3.0.0| Review Comment: Since we are touching it, let's make the doc more accurate. It's not only overflow, but all illegal operations, including overflow, parsing invalid string to numbers, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at:
[GitHub] [spark] LuciferYang commented on pull request #36616: [WIP][SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`
LuciferYang commented on PR #36616: URL: https://github.com/apache/spark/pull/36616#issuecomment-1132482921 will update pr description later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss
cloud-fan commented on PR #36615: URL: https://github.com/apache/spark/pull/36615#issuecomment-1132481952 Good catch! This is a long-standing issue. The type coercion for decimal types is really messy as it's not bound to `Expression.resolved`. Changing the rule order does fix this simple query, but I'm afraid it's still fragile as rule order is quite unreliable. I'd like to have a more aggresive refactor: let's don't require operands of these math operations to be the same decimal type, and we can define the return decimal type for each math operation individually. Then the only thing need to be done in the type coercion is to cast non-decimal operands to decimal types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #36616: [SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`
LuciferYang opened a new pull request, #36616: URL: https://github.com/apache/spark/pull/36616 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36486: [SPARK-39129][PS] Implement GroupBy.ewm
HyukjinKwon commented on PR #36486: URL: https://github.com/apache/spark/pull/36486#issuecomment-1132476982 I haven't taken a close look but seems fine from a cursory look. Should be good to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #36486: [SPARK-39129][PS] Implement GroupBy.ewm
zhengruifeng commented on PR #36486: URL: https://github.com/apache/spark/pull/36486#issuecomment-1132454863 cc @HyukjinKwon @xinrong-databricks @itholic would you mind take a look whenyou have some time, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang commented on pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss
manuzhang commented on PR #36615: URL: https://github.com/apache/spark/pull/36615#issuecomment-1132451324 cc @gengliangwang @cloud-fan @turboFei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] manuzhang opened a new pull request, #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss
manuzhang opened a new pull request, #36615: URL: https://github.com/apache/spark/pull/36615 ### What changes were proposed in this pull request? When analyzing, apply WidenSetOperationTypes after other rules. ### Why are the changes needed? The following SQL returns 1.00 while 1. is expected since union should pick the wider precision, the `Decimal(38,20)` from `v / v` . ``` CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v); CREATE OR REPLACE TEMPORARY VIEW t4 AS SELECT CAST(v AS DECIMAL(18, 2)) AS v FROM t3; SELECT CAST(1 AS DECIMAL(28, 2)) UNION ALL SELECT v / v FROM t4; ``` Checking the analyzed logical plan of the above SQL, `Project [cast((v / v)#236 as decimal(28,2)) AS (v / v)#237]` is added by `WidenSetOperationTypes` before `DecimalPrecision` promoting precision for the divide. The result of `v / v` is cast to the narrower `decimal(28,2)`. ``` == Analyzed Logical Plan == CAST(1 AS DECIMAL(28,2)): decimal(28,2) Union false, false :- Project [CAST(1 AS DECIMAL(28,2))#235] : +- Project [cast(1 as decimal(28,2)) AS CAST(1 AS DECIMAL(28,2))#235] : +- OneRowRelation +- Project [cast((v / v)#236 as decimal(28,2)) AS (v / v)#237] +- Project [CheckOverflow((promote_precision(cast(v#228 as decimal(18,2))) / promote_precision(cast(v#228 as decimal(18,2, DecimalType(38,20), false) AS (v / v)#236] +- SubqueryAlias t4 +- Project [cast(v#226 as decimal(18,2)) AS v#228] +- SubqueryAlias t3 +- SubqueryAlias tbl +- LocalRelation [v#226] ``` Hence, I propose to apply `WidenSetOperationTypes` after `DecimalPrecision`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add UT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
HyukjinKwon commented on PR #36589: URL: https://github.com/apache/spark/pull/36589#issuecomment-1132433602 Merged to master and branch-3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
HyukjinKwon closed pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully URL: https://github.com/apache/spark/pull/36589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang commented on PR #36614: URL: https://github.com/apache/spark/pull/36614#issuecomment-1132430487 cc @tanvn as well. Thanks for pointing it out! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation
gengliangwang opened a new pull request, #36614: URL: https://github.com/apache/spark/pull/36614 ### What changes were proposed in this pull request? 1. Remove the Experimental notation in ANSI SQL compliance doc 2. Update the description of `spark.sql.ansi.enabled`, since the ANSI reversed keyword is disabled by default now ### Why are the changes needed? 1. The ANSI SQL dialect is GAed in Spark 3.2 release: https://spark.apache.org/releases/spark-release-3-2-0.html We should not mark it as "Experimental" in the doc. 2. The ANSI reversed keyword is disabled by default now ### Does this PR introduce _any_ user-facing change? No, just doc change ### How was this patch tested? Doc preview: https://user-images.githubusercontent.com/1097932/169444094-de9c33c2-1b01-4fc3-b583-b752c71e16d8.png";> https://user-images.githubusercontent.com/1097932/169446841-a945bffe-84eb-45e9-9ebb-501387e216cf.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity
dongjoon-hyun commented on PR #36358: URL: https://github.com/apache/spark/pull/36358#issuecomment-1132427215 Thank you so much, @zwangsheng . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877706031 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) +val idents = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val storage = DataSource.buildStorageFormatFromOptions(options) val tableType = if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED } -val tableDesc = CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = storage, - schema = schema, - provider = Some(source), - comment = { if (description.isEmpty) None else Some(description) } -) -val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) +val location = if (storage.locationUri.isDefined) { + val locationStr = storage.locationUri.get.toString + Some(locationStr) +} else { + None +} + +val tableSpec = + TableSpec( +properties = Map(), +provider = Some(source), +options = options, +location = location, +comment = { if (description.isEmpty) None else Some(description) }, +serde = None, +external = tableType == CatalogTableType.EXTERNAL) + +val plan = + CreateTable( +name = UnresolvedDBObjectName(idents, isNamespace = true), Review Comment: oops yes it should be false. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877705884 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { -val tables = sessionCatalog.listTables(dbName).map(makeTable) -CatalogImpl.makeDataset(tables, sparkSession) +if (sessionCatalog.databaseExists(dbName)) { + val tables = sessionCatalog.listTables(dbName).map(makeTable) + CatalogImpl.makeDataset(tables, sparkSession) +} else { + val multiParts = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + val plan = ShowTables(UnresolvedNamespace(multiParts), None) + val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect() + val tables = ret +.map(row => TableIdentifier(row.getString(1), Some(row.getString(0 +.map(makeTable) Review Comment: oh yes you are right. Let me add a version that looks up through analyzer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zwangsheng closed pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity
zwangsheng closed pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity URL: https://github.com/apache/spark/pull/36358 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zwangsheng commented on pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity
zwangsheng commented on PR #36358: URL: https://github.com/apache/spark/pull/36358#issuecomment-1132423502 > Hi, @zwangsheng . Thank you for making a PR. However, Apache Spark community wants to avoid feature duplications like this. The proposed feature is already delivered to many production environments via PodTemplate and has been used by the customers without any problem. Adding another configuration only makes the users confused . @dongjoon-hyun Thanks for your reply. I can understand the above and accept it. Thanks all for review this PR!!! I will close this PR and look forward to meeting in the another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.
Ngone51 commented on code in PR #36162: URL: https://github.com/apache/spark/pull/36162#discussion_r877700761 ## core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala: ## @@ -769,6 +785,25 @@ private[spark] class TaskSetManager( } } + def setTaskRecordsAndRunTime( + info: TaskInfo, + result: DirectTaskResult[_]): Unit = { +var records = 0L +var runTime = 0L +result.accumUpdates.foreach { a => + if (a.name == Some(shuffleRead.RECORDS_READ) || +a.name == Some(input.RECORDS_READ)) { +val acc = a.asInstanceOf[LongAccumulator] +records += acc.value + } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) { +val acc = a.asInstanceOf[LongAccumulator] +runTime = acc.value + } +} +info.setRecords(records) +info.setRunTime(runTime) Review Comment: Same here.. I think we centralize the calculation of this stuff into `InefficientTaskCalculator`. ## core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala: ## @@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl( executorUpdates) } + private def getTaskAccumulableInfosAndProgressRate( + updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = { + var records = 0L + var runTime = 0L + val accInfos = updates.map { acc => + if (calculateTaskProgressRate && acc.name.isDefined) { + val name = acc.name.get + if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) { + records += acc.value.asInstanceOf[Long] + } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) { + runTime = acc.value.asInstanceOf[Long] + } + } + acc.toInfo(Some(acc.value), None) + } + val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) { + records / (runTime / 1000.0) + } else { + 0.0D + } Review Comment: Can we centralize the calculation of task progress rate to the `InefficientTaskCalculator` only? It seems not each calculation is necessary here since the speculation check only happens under certain conditions, e.g., `numSuccessfulTasks >= minFinishedForSpeculation`. And I think we can reuse the existing`TaskInfo._accumulables` directly, which could make cold cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope
beliefer commented on PR #36608: URL: https://github.com/apache/spark/pull/36608#issuecomment-1132419433 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental
gengliangwang commented on PR #27590: URL: https://github.com/apache/spark/pull/27590#issuecomment-1132410903 @tanvn nice catch! @cloud-fan Yes I will update the docs on 3.2 and above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
HyukjinKwon commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877689895 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: Yeah, maybe we can add that parameter, and document which value in `axis` is not supported then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental
cloud-fan commented on PR #27590: URL: https://github.com/apache/spark/pull/27590#issuecomment-1132399814 I think we can remove the experimental mark now. What do you think? @gengliangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877685094 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) +val idents = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val storage = DataSource.buildStorageFormatFromOptions(options) val tableType = if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED } -val tableDesc = CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = storage, - schema = schema, - provider = Some(source), - comment = { if (description.isEmpty) None else Some(description) } -) -val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) +val location = if (storage.locationUri.isDefined) { + val locationStr = storage.locationUri.get.toString + Some(locationStr) +} else { + None +} + +val tableSpec = + TableSpec( +properties = Map(), +provider = Some(source), +options = options, +location = location, +comment = { if (description.isEmpty) None else Some(description) }, +serde = None, +external = tableType == CatalogTableType.EXTERNAL) + +val plan = + CreateTable( +name = UnresolvedDBObjectName(idents, isNamespace = true), Review Comment: `isNamespace` should be false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877684804 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) +val idents = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val storage = DataSource.buildStorageFormatFromOptions(options) val tableType = if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED } -val tableDesc = CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = storage, - schema = schema, - provider = Some(source), - comment = { if (description.isEmpty) None else Some(description) } -) -val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) +val location = if (storage.locationUri.isDefined) { + val locationStr = storage.locationUri.get.toString + Some(locationStr) +} else { + None +} + +val tableSpec = + TableSpec( +properties = Map(), +provider = Some(source), +options = options, +location = location, +comment = { if (description.isEmpty) None else Some(description) }, +serde = None, +external = tableType == CatalogTableType.EXTERNAL) + +val plan = + CreateTable( +name = UnresolvedDBObjectName(idents, isNamespace = true), +tableSchema = schema, +partitioning = Seq(), +tableSpec = tableSpec, +ignoreIfExists = false) Review Comment: ```suggestion val plan = CreateTable( name = UnresolvedDBObjectName(idents, isNamespace = true), tableSchema = schema, partitioning = Seq(), tableSpec = tableSpec, ignoreIfExists = false) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877684610 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) +val idents = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) val storage = DataSource.buildStorageFormatFromOptions(options) val tableType = if (storage.locationUri.isDefined) { CatalogTableType.EXTERNAL } else { CatalogTableType.MANAGED } -val tableDesc = CatalogTable( - identifier = tableIdent, - tableType = tableType, - storage = storage, - schema = schema, - provider = Some(source), - comment = { if (description.isEmpty) None else Some(description) } -) -val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None) +val location = if (storage.locationUri.isDefined) { + val locationStr = storage.locationUri.get.toString + Some(locationStr) +} else { + None +} + +val tableSpec = + TableSpec( +properties = Map(), +provider = Some(source), +options = options, +location = location, +comment = { if (description.isEmpty) None else Some(description) }, +serde = None, +external = tableType == CatalogTableType.EXTERNAL) Review Comment: ```suggestion val tableSpec = TableSpec( properties = Map(), provider = Some(source), options = options, location = location, comment = { if (description.isEmpty) None else Some(description) }, serde = None, external = tableType == CatalogTableType.EXTERNAL) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
ulysses-you commented on PR #34785: URL: https://github.com/apache/spark/pull/34785#issuecomment-1132397474 Looks correct to me. BTW, after Spark3.3 the RebalancePartitions supports specify the initialNumPartition, so the demo code can be: ```scala val optNumPartitions = if (numPartitions == 0) None else Some(numPartitions) if (!write.distributionStrictlyRequired()) { RebalancePartitions(distribution, query, optNumPartitions) } else { RepartitionByExpression(distribution, query, optNumPartitions) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877684350 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { schema: StructType, description: String, options: Map[String, String]): DataFrame = { -val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) +val idents = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) Review Comment: ```suggestion val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877684235 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { -val tables = sessionCatalog.listTables(dbName).map(makeTable) -CatalogImpl.makeDataset(tables, sparkSession) +if (sessionCatalog.databaseExists(dbName)) { + val tables = sessionCatalog.listTables(dbName).map(makeTable) + CatalogImpl.makeDataset(tables, sparkSession) +} else { + val multiParts = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + val plan = ShowTables(UnresolvedNamespace(multiParts), None) + val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect() + val tables = ret +.map(row => TableIdentifier(row.getString(1), Some(row.getString(0 +.map(makeTable) Review Comment: `makeTable` only looks up table from hive metastore. I think we need a new `makeTable` which takes `Seq[String]` and looks up table through analyzer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cloud-fan commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r877682958 ## sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala: ## @@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { -val tables = sessionCatalog.listTables(dbName).map(makeTable) -CatalogImpl.makeDataset(tables, sparkSession) +if (sessionCatalog.databaseExists(dbName)) { + val tables = sessionCatalog.listTables(dbName).map(makeTable) + CatalogImpl.makeDataset(tables, sparkSession) +} else { + val multiParts = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) Review Comment: ```suggestion val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
Yikun commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877673383 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: And the upstream pandas's `axis` is also a dummy parameter (this is really special). So we just keep same with pandas doc/parameters is enough. After this, the missing parameters would be remove is right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
Yikun commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877669800 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: ![image](https://user-images.githubusercontent.com/1736354/169432658-ab51f8eb-2014-4d6b-b95d-67e78e077029.png) It would be listed, if we don't add it. ```python >>> from inspect import signature >>> signature(pd.Series.argmax).parameters mappingproxy(OrderedDict([('self', ), ('axis', ), ('skipna', ), ('args', ), ('kwargs', )])) >>> signature(ps.Series.argmax).parameters mappingproxy(OrderedDict([('self', ), ('skipna', )])) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r877674754 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -308,28 +308,7 @@ private[spark] object Utils extends Logging { * newly created, and is not marked for automatic deletion. */ def createDirectory(root: String, namePrefix: String = "spark"): File = { -var attempts = 0 -val maxAttempts = MAX_DIR_CREATION_ATTEMPTS -var dir: File = null -while (dir == null) { - attempts += 1 - if (attempts > maxAttempts) { -throw new IOException("Failed to create a temp directory (under " + root + ") after " + - maxAttempts + " attempts!") - } - try { -dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString) -// SPARK-35907: -// This could throw more meaningful exception information if directory creation failed. -Files.createDirectories(dir.toPath) - } catch { -case e @ (_ : IOException | _ : SecurityException) => - logError(s"Failed to create directory $dir", e) - dir = null - } -} - -dir.getCanonicalFile +JavaUtils.createDirectory(root, namePrefix) Review Comment: https://github.com/apache/spark/blob/5ee6f72744143cc5e19aa058df209f7156e51cee/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L399-L419 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on code in PR #36611: URL: https://github.com/apache/spark/pull/36611#discussion_r877674586 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -339,9 +318,7 @@ private[spark] object Utils extends Logging { def createTempDir( root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { -val dir = createDirectory(root, namePrefix) -ShutdownHookManager.registerShutdownDeleteDir(dir) -dir +JavaUtils.createTempDir(root, namePrefix) Review Comment: https://github.com/apache/spark/blob/5ee6f72744143cc5e19aa058df209f7156e51cee/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L379-L385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #36592: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab
yaooqinn commented on PR #36592: URL: https://github.com/apache/spark/pull/36592#issuecomment-1132373620 thanks, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn closed pull request #36592: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab
yaooqinn closed pull request #36592: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab URL: https://github.com/apache/spark/pull/36592 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`
LuciferYang commented on PR #36611: URL: https://github.com/apache/spark/pull/36611#issuecomment-1132373369 > Yeah, I think we should better fix `Utils.createTempDir`. Yeah ~ now this pr only change one file and achieved the goal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
Yikun commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877673383 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: And the upstream pandas's `axis` is also a dummy parameter (this is really special). So we just keep same with pandas doc/parameters is enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
Yikun commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877669800 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: ![image](https://user-images.githubusercontent.com/1736354/169432658-ab51f8eb-2014-4d6b-b95d-67e78e077029.png) It would be listed. ```python >>> from inspect import signature >>> signature(pd.Series.argmax).parameters mappingproxy(OrderedDict([('self', ), ('axis', ), ('skipna', ), ('args', ), ('kwargs', )])) >>> signature(ps.Series.argmax).parameters mappingproxy(OrderedDict([('self', ), ('skipna', )])) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
huaxingao commented on PR #34785: URL: https://github.com/apache/spark/pull/34785#issuecomment-1132364552 cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`
HyukjinKwon commented on code in PR #36599: URL: https://github.com/apache/spark/pull/36599#discussion_r877664483 ## python/pyspark/pandas/series.py: ## @@ -6239,13 +6239,19 @@ def argsort(self) -> "Series": ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda _: SF.lit(-1))]), ) -def argmax(self) -> int: +def argmax(self, skipna: bool = True) -> int: Review Comment: Good point. But I guess it won't be listed in the missing parameters in the documentation (?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
huaxingao commented on PR #34785: URL: https://github.com/apache/spark/pull/34785#issuecomment-1132363307 Thanks @aokolnychyi for the proposal. I agree that we should support both strictly required distribution and best effort distribution. For best effort distribution, if user doesn't request a specific number of partitions, we will split skewed partitions and coalesce small partitions. For strictly required distribution, if user doesn't request a specific number of partitions, we will coalesce small partitions but we will NOT split skewed partitions since this changes the required distribution. In interface `RequiresDistributionAndOrdering`, I will add ``` default boolean distributionStrictlyRequired() { return true; } ``` Then in `DistributionAndOrderingUtils`.`prepareQuery`, I will change the code to something like this: ``` val queryWithDistribution = if (distribution.nonEmpty) { if (!write.distributionStrictlyRequired() && numPartitions == 0) { RebalancePartitions(distribution, query) } else { if (numPartitions > 0) { RepartitionByExpression(distribution, query, numPartitions) } else { RepartitionByExpression(distribution, query, None) } } ... ``` Basically, in the best effort case, if the requested numPartitions is 0, we will use `RebalancePartitions` so both `OptimizeSkewInRebalancePartitions` and `CoalesceShufflePartitions` will be applied. In the strictly required distribution case, if the requested numPartitions is 0, we will use `RepartitionByExpression(distribution, query, None)` so `CoalesceShufflePartitions` will be applied. Does this sound correct for every one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36330: [SPARK-38897][SQL] DS V2 supports push down string functions
beliefer commented on code in PR #36330: URL: https://github.com/apache/spark/pull/36330#discussion_r877653817 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java: ## @@ -228,4 +244,18 @@ protected String visitSQLFunction(String funcName, String[] inputs) { protected String visitUnexpectedExpr(Expression expr) throws IllegalArgumentException { throw new IllegalArgumentException("Unexpected V2 expression: " + expr); } + + protected String visitOverlay(String[] inputs) { +throw new UnsupportedOperationException("Function: OVERLAY does not support "); Review Comment: @chenzhx The default just used to display. It should return `OVERLAY` syntax. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
HyukjinKwon commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r877653141 ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) Review Comment: Yeah, that should work too. ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) Review Comment: Let me fix 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zsxwing commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
zsxwing commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r877648746 ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) Review Comment: I see. `batch_df.count()` probably will touch some code that can be interrupted. Can we use `self.spark._jvm.java.lang.Thread.sleep(1)` to save 5 seconds in this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
HyukjinKwon commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r877647489 ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) Review Comment: Actually, `batch_df.count()` below triggers it via accessing to the interrupted Java thread if I'm not mistaken. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zsxwing commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully
zsxwing commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r877642301 ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) +batch_df.count() + +q = self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() +time.sleep(5) +q.stop() +time.sleep(15) # Wait enough for the exception to be propagated if exists. Review Comment: this is not needed. `q.stop()` will wait until the streaming thread is dead. ## python/pyspark/sql/tests/test_streaming.py: ## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() +def test_streaming_foreachBatch_graceful_stop(self): +# SPARK-39218: Make foreachBatch streaming query stop gracefully +def func(batch_df, _): +time.sleep(10) Review Comment: How does this func trigger InterruptedException? I would expect codes like `self.spark._jvm.java.lang.Thread.sleep(1)` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36611: [SPARK-39204][BUILD][CORE][SQL][DSTREAM][GRAPHX][K8S][ML][MLLIB][SS][YARN][EXAMPLES][SHELL] Replace `Utils.createTempDir` with `JavaUtils
HyukjinKwon commented on PR #36611: URL: https://github.com/apache/spark/pull/36611#issuecomment-1132340311 Yeah, I think we should better fix `Utils.createTempDir`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
github-actions[bot] closed pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified URL: https://github.com/apache/spark/pull/34785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #35049: [SPARK-37757][BUILD] Enable Spark test scheduled job on ARM runner
github-actions[bot] closed pull request #35049: [SPARK-37757][BUILD] Enable Spark test scheduled job on ARM runner URL: https://github.com/apache/spark/pull/35049 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #35402: [SPARK-37536][SQL] Allow for API user to disable Shuffle on Local Mode
github-actions[bot] closed pull request #35402: [SPARK-37536][SQL] Allow for API user to disable Shuffle on Local Mode URL: https://github.com/apache/spark/pull/35402 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #35424: [WIP][SPARK-38116] Add auto commit option to JDBC PostgreSQL driver and set the option false default
github-actions[bot] commented on PR #35424: URL: https://github.com/apache/spark/pull/35424#issuecomment-1132318749 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes
dongjoon-hyun commented on PR #36004: URL: https://github.com/apache/spark/pull/36004#issuecomment-1132318738 Thank you, @eejbyfeldt , @cloud-fan , @srowen ! cc @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes
srowen commented on PR #36004: URL: https://github.com/apache/spark/pull/36004#issuecomment-1132316150 Merged to master/3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #36004: [SPARK-38681][SQL] Support nested generic case classes
srowen closed pull request #36004: [SPARK-38681][SQL] Support nested generic case classes URL: https://github.com/apache/spark/pull/36004 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes
dongjoon-hyun commented on PR #36004: URL: https://github.com/apache/spark/pull/36004#issuecomment-1132295581 Thank you, @eejbyfeldt . cc @srowen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hai-tao-1 commented on pull request #36606: [SPARK-39232][CORE] History Server Main Page App List Filtering
hai-tao-1 commented on PR #36606: URL: https://github.com/apache/spark/pull/36606#issuecomment-1132271280 The PR test fails with ```[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.2.0! Found 9 potential problems (filtered 924)```. Anyone could advise what may be wrong? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hai-tao-1 commented on pull request #36606: [SPARK-39232][CORE] History Server Main Page App List Filtering
hai-tao-1 commented on PR #36606: URL: https://github.com/apache/spark/pull/36606#issuecomment-1132271279 The PR test fails with ```[error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.12:3.2.0! Found 9 potential problems (filtered 924)```. Anyone could advise what may be wrong? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`
dongjoon-hyun commented on PR #36597: URL: https://github.com/apache/spark/pull/36597#issuecomment-1132159846 Merged to master. I added you to the Apache Spark contributor group and assigned SPARK-39225 to you, @hai-tao-1 . Welcome to the Apache Spark community. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`
dongjoon-hyun closed pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize` URL: https://github.com/apache/spark/pull/36597 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on PR #36586: URL: https://github.com/apache/spark/pull/36586#issuecomment-1132121981 R: @cloud-fan this PR is ready to review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao opened a new pull request, #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
huaxingao opened a new pull request, #34785: URL: https://github.com/apache/spark/pull/34785 ### What changes were proposed in this pull request? Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified ### Why are the changes needed? When doing repartition in distribution and sort, we will use Rebalance operator instead of RepartitionByExpression to optimize skewed partitions when 1. numPartitions is not specified by the data source, and 2. sortOrder is specified. This is because the requested distribution needs to be guaranteed, which can only be achieved by using RangePartitioning, not HashPartitioning. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing and new tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to
otterc commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877366840 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// there is a FetchFailed event and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null) { + mapOutputTracker. +unregisterMergeResult(shuffleId, reduceId, bmAddress, None) +} Review Comment: Can you also check line 2031 where `removeExecutorAndUnregisterOutputs` is called. The `executorId` for ShuffleMergedChunk is `shuffle-push-merger`. Will that cause any issues? We should probably add a UT for this as well with the `unRegisterOutputOnHostOnFetchFailure` enabled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified
aokolnychyi commented on PR #34785: URL: https://github.com/apache/spark/pull/34785#issuecomment-1132014116 Thanks for the PR, @huaxingao. I think it is a great feature and it would be awesome to get it done. I spent some time thinking about this and have a few questions/proposals. If I understand correctly, we currently hard-code the number of shuffle partitions in `RepartitionByExpression`, which prohibits both coalescing and skew split optimizations. It seems reasonable to support cases when the requested distribution is best-effort but I also think there are valid cases when the distribution is required for correctness and it is actually the current API contract. What about extending `RequiredDistributionAndOrdering` to indicate the distribution is not strictly required? We can add some boolean method and default it to keep the existing behavior. If the distribution is required, we can still benefit from coalescing as I think `CoalesceShufflePartitions` and `AQEShuffleReadExec` would keep the original distribution in coalesce cases. That’s already a huge win. We can avoid too small files while keeping the requested distribution. I also agree about using `RebalancePartitions` when the distribution is not strictly required. What about extending `RebalancePartitions` to also support range partitioning? It currently supports only hash and round-robin. If we make that change, we will be able to remove unnecessary shuffles in the optimizer and keep the original distribution as long as there is no skew and we only coalesce. If there is a skew, an extra shuffle and changed distribution seems like a reasonable overhead. What does everybody else think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t
mridulm commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877361726 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { Review Comment: I am trying to understand, can we have a case where `mapIndex == -1` and `bmAddress != null` for non-push based shuffle ? If no, we should add an assertion there (not drop the check entirely). Note, L1882 has the check because the earlier check is for `mapIndex != -1` - which can be satisfied for all shuffles -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36603: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file
MaxGekk commented on PR #36603: URL: https://github.com/apache/spark/pull/36603#issuecomment-1132003245 @panbingkun Could you backport this to branch-3.3, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36603: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file
MaxGekk closed pull request #36603: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file URL: https://github.com/apache/spark/pull/36603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to
otterc commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877340985 ## core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala: ## @@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2))) } + test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " + Review Comment: Nit: modify this name so it's clear that when corruption goes undetected then it should throw fetch failure. Otherwise, it is confusing because one test says we should fallback on corruption and the other says it should throw fetch failure ## core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala: ## @@ -1166,6 +1166,9 @@ final class ShuffleBlockFetcherIterator( case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) => throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, mapIndex, startReduceId, msg, e) + case ShuffleBlockChunkId(shuffleId, _, reduceId, _) => +SparkCoreErrors.fetchFailedError(address, shuffleId, + -1L, SHUFFLE_PUSH_MAP_ID, reduceId, msg, e) Review Comment: Nit: Can we use SHUFFLE_PUSH_MAP for mapId as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hai-tao-1 commented on pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`
hai-tao-1 commented on PR #36597: URL: https://github.com/apache/spark/pull/36597#issuecomment-1131988355 > Thank you for updates, @hai-tao-1 . Yes, the only remaining comment is the test case. > > > We need a test case for the configuration. Please check the corner cases especially. @dongjoon-hyun Added unit test test("SPARK-39225: Support spark.history.fs.update.batchSize"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to
otterc commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877329983 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { Review Comment: I think we should check for `pushBasedShuffleEnabled` here just for consistency with lines 1882-1886. There aren't going to be any merge results if push-based shuffle is not enabled but since we do it at the other places we should do it here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #36611: [SPARK-39204][BUILD][CORE][SQL][DSTREAM][GRAPHX][K8S][ML][MLLIB][SS][YARN][EXAMPLES][SHELL] Replace `Utils.createTempDir` with `JavaUtils
LuciferYang commented on PR #36611: URL: https://github.com/apache/spark/pull/36611#issuecomment-1131979146 It seems that this change is big. Another way to keep one `createTempDir` is to let `Utils.createTempDir` call `JavaUtils.createTempDir` . Is this acceptable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se
akpatnam25 commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877316473 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { Review Comment: we dont need that check here, removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se
akpatnam25 commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877316296 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { + mapOutputTracker. +unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) Review Comment: yep, passing in `None` now. this way we do not have to explicitly pass in -1 in the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] nkronenfeld opened a new pull request, #36613: [WIP][SPARK-30983] Support typed select in Datasets up to the max tuple size
nkronenfeld opened a new pull request, #36613: URL: https://github.com/apache/spark/pull/36613 ### What changes were proposed in this pull request? This PR simply adds typed select methods to Dataset up to the max Tuple size of 22. This has been bugging me for years, so I finally decided to get off my backside and do something about it :-). As noted in the JIRA issue, technically, this is a breaking change - indeed, I had to remove an old test that specifically tested that Spark didn't support typed select for tuples larger than 5. However, it would take someone explicitly relying on select returning a DataFrame instead of a Dataset when using select on large tuples of typed columns (though I guess that test I had to remove exhibits one case where this may happen). I've set the PR as WIP because I've been unable to run all tests so far - not due to the fix, but rather due to not having things set up correctly on my computer. Still working on that. ### Why are the changes needed? Arbitrarily supporting only up to 5-tuples is weird and unpredictable. ### Does this PR introduce _any_ user-facing change? Yes, select on tuples of all typed columns larger than 5 will now return a Dataset instead of a DataFrame ### How was this patch tested? I've run all sql tests, and they all pass (though testing itself still fails on my machine, I think with a path-too-long error I've added a test to make sure the typed select works on all sizes - mostly this is a compile issue, not a run-time issue, but I checked values too, just to double-check that I didn't miss anything (which is a big potential problem with long tuples and copy-paste errors) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vli-databricks commented on pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
vli-databricks commented on PR #36584: URL: https://github.com/apache/spark/pull/36584#issuecomment-1131948634 Yes, the purpose is ease of migration, removed change to `functions.scala` to limit scope to Spark SQL only. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function
MaxGekk commented on PR #36584: URL: https://github.com/apache/spark/pull/36584#issuecomment-1131939361 How about to add the function to other APIs like first() in - PySpark: https://github.com/apache/spark/blob/b63674ea5f746306a96ab8c39c23a230a6cb9566/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L500 - R: https://github.com/apache/spark/blob/16d1c68e8b185457ae86a248d0874e61c3bc6f3a/R/pkg/R/functions.R#L1178 BTW, if the purpose of this new feature is to make migrations to Spark SQL from other systems easier, I would propose to add it to Spark SQL only (and not extend functions.scala). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t
mridulm commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877275914 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { + mapOutputTracker. +unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) Review Comment: Given `mapIndex` is -1, should this not be `None` passed in to `unregisterMergeResult` instead of `Option(-1)` ? The tests are passing because you explicitly added `-1` to the bitmap, which looks incorrect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t
mridulm commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877273610 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if +// the FetchFailed event contains a mapIndex of -1, and is not a +// MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null && pushBasedShuffleEnabled) { Review Comment: Why do we need `pushBasedShuffleEnabled` check here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36580: [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression
MaxGekk commented on code in PR #36580: URL: https://github.com/apache/spark/pull/36580#discussion_r877269334 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ## @@ -1971,4 +1971,10 @@ object QueryExecutionErrors extends QueryErrorsBase { s"add ${toSQLValue(amount, IntegerType)} $unit to " + s"${toSQLValue(DateTimeUtils.microsToInstant(micros), TimestampType)}")) } + + def multipleRowSubqueryError(plan: String): Throwable = { +new SparkIllegalStateException( Review Comment: Please, use SparkException or SparkRuntimeException. I removed the the exception by https://github.com/apache/spark/pull/36550. BTW, we shouldn't throw `IllegalStateException` if the exception can be trigger by user code in regular cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36612: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage
MaxGekk closed pull request #36612: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage URL: https://github.com/apache/spark/pull/36612 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36612: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage
MaxGekk commented on PR #36612: URL: https://github.com/apache/spark/pull/36612#issuecomment-1131917484 +1, LGTM. Merging to master. Thank you, @gengliangwang and @cloud-fan for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vli-databricks commented on pull request #36584: [SPARK-39213] Create ANY_VALUE aggregate function
vli-databricks commented on PR #36584: URL: https://github.com/apache/spark/pull/36584#issuecomment-1131915487 @MaxGekk please review and help me merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t
mridulm commented on code in PR #36601: URL: https://github.com/apache/spark/pull/36601#discussion_r877251054 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4342,6 +4342,56 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assertDataStructuresEmpty() } + test("SPARK-38987: corrupted shuffle block FetchFailure should unregister merge result") { +initPushBasedShuffleConfs(conf) +DAGSchedulerSuite.clearMergerLocs() +DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5")) + +scheduler = new MyDAGScheduler( + sc, + taskScheduler, + sc.listenerBus, + mapOutputTracker, + blockManagerMaster, + sc.env, + shuffleMergeFinalize = false, + shuffleMergeRegister = false) +dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler) + +val parts = 2 +val shuffleMapRdd = new MyRDD(sc, parts, Nil) +val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts)) +val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker) + +// Submit a reduce job that depends which will create a map stage +submit(reduceRdd, (0 until parts).toArray) + +// Pass in custom bitmap so that the mergeStatus can store +// the correct mapIndex. +val bitmap = new RoaringBitmap() +bitmap.add(-1) + +val shuffleMapStage = scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage] +scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((0, MergeStatus(makeBlockManagerId("hostA"), shuffleDep.shuffleMergeId, bitmap, 1000L +scheduler.handleShuffleMergeFinalized(shuffleMapStage, + shuffleMapStage.shuffleDep.shuffleMergeId) +scheduler.handleRegisterMergeStatuses(shuffleMapStage, + Seq((1, MergeStatus(makeBlockManagerId("hostA"), shuffleDep.shuffleMergeId, bitmap, 1000L + Review Comment: You are right, we need the bitmap to be valid, and not mocked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] tanvn commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental
tanvn commented on PR #27590: URL: https://github.com/apache/spark/pull/27590#issuecomment-1131855914 @gengliangwang @dongjoon-hyun Hi, I have a question. In Spark 3.2.1, are `spark.sql.ansi.enabled` and `spark.sql.storeAssignmentPolicy` still considered as experimental options ? I understand that there is an epic named `ANSI enhancements in Spark 3.3` https://issues.apache.org/jira/browse/SPARK-38860 which means there will be new features for the `spark.sql.ansi.enabled`. But as in https://spark.apache.org/releases/spark-release-3-2-0.html, ANSI SQL mode GA ([SPARK-35030](https://issues.apache.org/jira/browse/SPARK-35030)) is mentioned in the Highlights section, so I think it is not `experimental` anymore. Could you please give me your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36377: [SPARK-39043][SQL] Spark SQL Hive client should not gather statistic by default.
dongjoon-hyun commented on PR #36377: URL: https://github.com/apache/spark/pull/36377#issuecomment-1131851884 Thank you for the reverting decision, @cloud-fan and @AngersZh . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators
cloud-fan closed pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators URL: https://github.com/apache/spark/pull/35850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators
cloud-fan commented on PR #35850: URL: https://github.com/apache/spark/pull/35850#issuecomment-1131827928 thanks, merging to master/3.3! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators
cloud-fan commented on code in PR #35850: URL: https://github.com/apache/spark/pull/35850#discussion_r877169574 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala: ## @@ -321,6 +321,38 @@ object GeneratorNestedColumnAliasing { // need to prune nested columns through Project and under Generate. The difference is // when `nestedSchemaPruningEnabled` is on, nested columns will be pruned further at // file format readers if it is supported. + +// There are [[ExtractValue]] expressions on or not on the output of the generator. Generator +// can also have different types: +// 1. For [[ExtractValue]]s not on the output of the generator, theoretically speaking, there +//lots of expressions that we can push down, including non ExtractValues and GetArrayItem +//and GetMapValue. But to be safe, we only handle GetStructField and GetArrayStructFields. +// 2. For [[ExtractValue]]s on the output of the generator, the situation depends on the type +//of the generator expression. *For now, we only support Explode*. +// 2.1 Inline +// Inline takes an input of ARRAY>, and returns an output of +// STRUCT, the output field can be directly accessed by name "field1". +// In this case, we should not try to push down the ExtractValue expressions to the +// input of the Inline. For example: +// Project[field1.x AS x] +// - Generate[ARRAY, field2:int>>, ..., field1, field2] +// It is incorrect to push down the .x to the input of the Inline. +// A valid field pruning would be to extract all the fields that are accessed by the +// Project, and manually reconstruct an expression using those fields. +// 2.2 Explode +// Explode takes an input of ARRAY and returns an output of +// STRUCT. The default field name "col" can be overwritten. +// If the input is MAP, it returns STRUCT. +// For the array case, it is only valid to push down GetStructField. After push down, +// the GetStructField becomes a GetArrayStructFields. Note that we cannot push down +// GetArrayStructFields, since the pushed down expression will operate on an array of +// array which is invalid. +// 2.3 Stack +// Stack takes a sequence of expressions, and returns an output of +// STRUCT +// The push down is doable but more complicated in this case as the expression that +// operates on the col_i of the output needs to pushed down to every (kn+i)-th input +// expression where n is the total number of columns (or struct fields) of the output. Review Comment: actually, I find it's useful to understand why we only support explode today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r877141680 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df5 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df5, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df5, "PushedFilters: [], ") +checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") Review Comment: This does not match https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26 Assume that I have a local array data source. According to the API doc, Spark pushes down LIMIT first. For this query, I'll do `array.take(1).drop(1)`. This is wrong and doesn't match the query `df.limit(2).offset(1)`. we should either fix the API doc, or fix the pushdown logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r877141680 ## sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala: ## @@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy"))) } + private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit = { +val offsets = df.queryExecution.optimizedPlan.collect { + case offset: Offset => offset +} +if (removed) { + assert(offsets.isEmpty) +} else { + assert(offsets.nonEmpty) +} + } + + test("simple scan with OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,") +checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df2 = spark.read + .option("pushDownOffset", "false") + .table("h2.test.employee") + .where($"dept" === 1) + .offset(1) +checkOffsetRemoved(df2, false) +checkPushedInfo(df2, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:") +checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false))) + +val df3 = spark.read + .option("partitionColumn", "dept") + .option("lowerBound", "0") + .option("upperBound", "2") + .option("numPartitions", "2") + .table("h2.test.employee") + .filter($"dept" > 1) + .offset(1) +checkOffsetRemoved(df3, false) +checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], ReadSchema:") +checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 12000, 1200, true))) + +val df4 = spark.read + .table("h2.test.employee") + .groupBy("DEPT").sum("SALARY") + .offset(1) +checkOffsetRemoved(df4, false) +checkPushedInfo(df4, + "PushedAggregates: [SUM(SALARY)], PushedFilters: [], PushedGroupByExpressions: [DEPT], ") +checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00))) + +val name = udf { (x: String) => x.matches("cat|dav|amy") } +val sub = udf { (x: String) => x.substring(0, 3) } +val df5 = spark.read + .table("h2.test.employee") + .select($"SALARY", $"BONUS", sub($"NAME").as("shortName")) + .filter(name($"shortName")) + .offset(1) +checkOffsetRemoved(df5, false) +// OFFSET is pushed down only if all the filters are pushed down +checkPushedInfo(df5, "PushedFilters: [], ") +checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, "cat"))) + } + + test("simple scan with LIMIT and OFFSET") { +val df1 = spark.read + .table("h2.test.employee") + .where($"dept" === 1) + .limit(2) + .offset(1) +checkLimitRemoved(df1) +checkOffsetRemoved(df1) +checkPushedInfo(df1, + "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, PushedOffset: OFFSET 1,") Review Comment: This does not match https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26 Assume that I have a local array data source. According to the API doc, Spark pushes down LIMIT first. For this query, I'll do `array.limit(1).drop(1)`. This is wrong and doesn't match the query `df.limit(2).offset(1)`. we should either fix the API doc, or fix the pushdown logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
cloud-fan commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877123725 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCCatalog.scala: ## @@ -32,11 +35,14 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging { +class JDBCCatalog extends TableCatalog with SupportsNamespaces with FunctionCatalog with Logging { private var catalogName: String = null private var options: JDBCOptions = _ private var dialect: JdbcDialect = _ + private val functions: util.Map[Identifier, UnboundFunction] = +new ConcurrentHashMap[Identifier, UnboundFunction]() Review Comment: We should clearly define how can this be used. I thought each JDBC dialect should have APIs to register its own UDFs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
cloud-fan commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877113355 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala: ## @@ -201,6 +203,14 @@ class V2ExpressionBuilder( None } // TODO supports other expressions +case ApplyFunctionExpression(function, children) => + val childrenExpressions = children.flatMap(generateExpression(_)) + if (childrenExpressions.length == children.length) { +Some(new GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT), Review Comment: UDF is kind of special and I think it's better to have a new v2 API for it, instead of reusing `GeneralScalarExpression`. e.g., we can add a `UserDefinedScalaFunction`, which defines the function name, canonical name and inputs. ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala: ## @@ -201,6 +203,14 @@ class V2ExpressionBuilder( None } // TODO supports other expressions +case ApplyFunctionExpression(function, children) => + val childrenExpressions = children.flatMap(generateExpression(_)) + if (childrenExpressions.length == children.length) { +Some(new GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT), Review Comment: UDF is kind of special and I think it's better to have a new v2 API for it, instead of reusing `GeneralScalarExpression`. e.g., we can add a `UserDefinedScalarFunction`, which defines the function name, canonical name and inputs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
cloud-fan commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877113355 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala: ## @@ -201,6 +203,14 @@ class V2ExpressionBuilder( None } // TODO supports other expressions +case ApplyFunctionExpression(function, children) => + val childrenExpressions = children.flatMap(generateExpression(_)) + if (childrenExpressions.length == children.length) { +Some(new GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT), Review Comment: UDF is kind of special and I think it's better to have a new v2 API for it, instead of reusing `GeneralScalarExpression`. e.g., we can add a `UserDefinedScalaExpression`, which defines the function name, canonical name and inputs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF
cloud-fan commented on code in PR #36593: URL: https://github.com/apache/spark/pull/36593#discussion_r877115137 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -744,6 +744,14 @@ object DataSourceStrategy PushableColumnWithoutNestedColumn(right), _) => Some(new GeneralAggregateFunc("CORR", agg.isDistinct, Array(FieldReference.column(left), FieldReference.column(right +case aggregate.V2Aggregator(aggrFunc, children, _, _) => + val translatedExprs = children.flatMap(PushableExpression.unapply(_)) + if (translatedExprs.length == children.length) { +Some(new GeneralAggregateFunc(aggrFunc.name().toUpperCase(Locale.ROOT), agg.isDistinct, Review Comment: ditto, let' create a dedicated v2 api, such as `UserDefinedAggregateFunction` ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala: ## @@ -744,6 +744,14 @@ object DataSourceStrategy PushableColumnWithoutNestedColumn(right), _) => Some(new GeneralAggregateFunc("CORR", agg.isDistinct, Array(FieldReference.column(left), FieldReference.column(right +case aggregate.V2Aggregator(aggrFunc, children, _, _) => + val translatedExprs = children.flatMap(PushableExpression.unapply(_)) + if (translatedExprs.length == children.length) { +Some(new GeneralAggregateFunc(aggrFunc.name().toUpperCase(Locale.ROOT), agg.isDistinct, Review Comment: ditto, let's create a dedicated v2 api, such as `UserDefinedAggregateFunction` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org