[GitHub] [spark] beliefer commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF

2022-05-19 Thread GitBox


beliefer commented on code in PR #36593:
URL: https://github.com/apache/spark/pull/36593#discussion_r877803218


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCCatalog.scala:
##
@@ -32,11 +35,14 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with 
Logging {
+class JDBCCatalog extends TableCatalog with SupportsNamespaces with 
FunctionCatalog with Logging {
   private var catalogName: String = null
   private var options: JDBCOptions = _
   private var dialect: JdbcDialect = _
 
+  private val functions: util.Map[Identifier, UnboundFunction] =
+new ConcurrentHashMap[Identifier, UnboundFunction]()

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #36588: [SPARK-39217][SQL] Makes DPP support the pruning side has Union

2022-05-19 Thread GitBox


wangyum commented on PR #36588:
URL: https://github.com/apache/spark/pull/36588#issuecomment-1132514681

   A case from production:
   
![image](https://user-images.githubusercontent.com/5399861/169463931-65bfd0c0-1759-4f9d-8a0a-66b32463b76a.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36608:
URL: https://github.com/apache/spark/pull/36608#discussion_r877763059


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala:
##
@@ -34,7 +34,7 @@ abstract class Covariance(val left: Expression, val right: 
Expression, nullOnDiv
   override def dataType: DataType = DoubleType
   override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType, DoubleType)
 
-  protected val n = AttributeReference("n", DoubleType, nullable = false)()
+  protected val count = AttributeReference("count", DoubleType, nullable = 
false)()

Review Comment:
   shall we make it `protected[sql]` so that we can access it directly in the 
new expression?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


amaliujia commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877754283


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
*/
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-val tables = sessionCatalog.listTables(dbName).map(makeTable)
-CatalogImpl.makeDataset(tables, sparkSession)
+if (sessionCatalog.databaseExists(dbName)) {
+  val tables = sessionCatalog.listTables(dbName).map(makeTable)
+  CatalogImpl.makeDataset(tables, sparkSession)
+} else {
+  val multiParts = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+  val plan = ShowTables(UnresolvedNamespace(multiParts), None)
+  val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+  val tables = ret
+.map(row => TableIdentifier(row.getString(1), Some(row.getString(0
+.map(makeTable)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #36616: [WIP][SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`

2022-05-19 Thread GitBox


LuciferYang commented on PR #36616:
URL: https://github.com/apache/spark/pull/36616#issuecomment-1132495258

   This pr mainly focuses on `Parquet`. If this is acceptable, I will change 
Orc in another pr


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36608:
URL: https://github.com/apache/spark/pull/36608#discussion_r877745569


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala:
##
@@ -69,7 +69,7 @@ abstract class Covariance(val left: Expression, val right: 
Expression, nullOnDiv
   }
 
   protected def updateExpressionsDef: Seq[Expression] = {
-val newN = n + 1.0
+val newN = count + 1.0

Review Comment:
   ```suggestion
   val newCount = count + 1.0
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36614:
URL: https://github.com/apache/spark/pull/36614#discussion_r877742782


##
docs/sql-ref-ansi-compliance.md:
##
@@ -28,10 +28,10 @@ The casting behaviours are defined as store assignment 
rules in the standard.
 
 When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies 
with the ANSI store assignment rules. This is a separate configuration because 
its default value is `ANSI`, while the configuration `spark.sql.ansi.enabled` 
is disabled by default.
 
-|Property Name|Default|Meaning|Since Version|
-|-|---|---|-|
-|`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to 
conform to the ANSI SQL specification:  1. Spark will throw a runtime 
exception if an overflow occurs in any operation on integral/decimal field. 
 2. Spark will forbid using the reserved keywords of ANSI SQL as 
identifiers in the SQL parser.|3.0.0|
-|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value 
into a column with different data type, Spark will perform type conversion.  
Currently, we support 3 policies for the type coercion rules: ANSI, legacy and 
strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In 
practice, the behavior is mostly the same as PostgreSQL.  It disallows certain 
unreasonable type conversions such as converting string to int or double to 
boolean.  With legacy policy, Spark allows the type coercion as long as it is a 
valid Cast, which is very loose.  e.g. converting string to int or double to 
boolean is allowed.  It is also the only behavior in Spark 2.x and it is 
compatible with Hive.  With strict policy, Spark doesn't allow any possible 
precision loss or data truncation in type coercion, e.g. converting double to 
int or decimal to double is not allowed.|3.0.0|
+|Property Name|Default| Meaning









   |Since Version|
+|-|---|---|-|
+|`spark.sql.ansi.enabled`|false| When true, Spark tries to conform to the ANSI 
SQL specification:  1. Spark will throw a runtime exception if an overflow 
occurs in any operation on integral/decimal field.  2. Spark will use 
different type coercion rules for resolving conflicts among data types. The 
rules are consistently based on data type precedence.   





|3.0.0|
+|`spark.sql.storeAssignmentPolicy`|ANSI| When inserting a value into a column 
with different data type, Spark will perform type conversion.  Currently, we 
support 3 policies for the type coercion rules: ANSI, legacy and strict. With 
ANSI policy, Spark performs the type coercion as per ANSI SQL. In practice, the 
behavior is mostly the same as PostgreSQL.  It disallows certain unreasonable 
type conversions such as converting string to int or double to boolean.  With 
legacy policy, S

[GitHub] [spark] cloud-fan commented on a diff in pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36614:
URL: https://github.com/apache/spark/pull/36614#discussion_r877742454


##
docs/sql-ref-ansi-compliance.md:
##
@@ -28,10 +28,10 @@ The casting behaviours are defined as store assignment 
rules in the standard.
 
 When `spark.sql.storeAssignmentPolicy` is set to `ANSI`, Spark SQL complies 
with the ANSI store assignment rules. This is a separate configuration because 
its default value is `ANSI`, while the configuration `spark.sql.ansi.enabled` 
is disabled by default.
 
-|Property Name|Default|Meaning|Since Version|
-|-|---|---|-|
-|`spark.sql.ansi.enabled`|false|(Experimental) When true, Spark tries to 
conform to the ANSI SQL specification:  1. Spark will throw a runtime 
exception if an overflow occurs in any operation on integral/decimal field. 
 2. Spark will forbid using the reserved keywords of ANSI SQL as 
identifiers in the SQL parser.|3.0.0|
-|`spark.sql.storeAssignmentPolicy`|ANSI|(Experimental) When inserting a value 
into a column with different data type, Spark will perform type conversion.  
Currently, we support 3 policies for the type coercion rules: ANSI, legacy and 
strict. With ANSI policy, Spark performs the type coercion as per ANSI SQL. In 
practice, the behavior is mostly the same as PostgreSQL.  It disallows certain 
unreasonable type conversions such as converting string to int or double to 
boolean.  With legacy policy, Spark allows the type coercion as long as it is a 
valid Cast, which is very loose.  e.g. converting string to int or double to 
boolean is allowed.  It is also the only behavior in Spark 2.x and it is 
compatible with Hive.  With strict policy, Spark doesn't allow any possible 
precision loss or data truncation in type coercion, e.g. converting double to 
int or decimal to double is not allowed.|3.0.0|
+|Property Name|Default| Meaning









   |Since Version|
+|-|---|---|-|
+|`spark.sql.ansi.enabled`|false| When true, Spark tries to conform to the ANSI 
SQL specification:  1. Spark will throw a runtime exception if an overflow 
occurs in any operation on integral/decimal field.  2. Spark will use 
different type coercion rules for resolving conflicts among data types. The 
rules are consistently based on data type precedence.   





|3.0.0|

Review Comment:
   Since we are touching it, let's make the doc more accurate. It's not only 
overflow, but all illegal operations, including overflow, parsing invalid 
string to numbers, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:

[GitHub] [spark] LuciferYang commented on pull request #36616: [WIP][SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`

2022-05-19 Thread GitBox


LuciferYang commented on PR #36616:
URL: https://github.com/apache/spark/pull/36616#issuecomment-1132482921

   will update pr description later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss

2022-05-19 Thread GitBox


cloud-fan commented on PR #36615:
URL: https://github.com/apache/spark/pull/36615#issuecomment-1132481952

   Good catch!
   
   This is a long-standing issue. The type coercion for decimal types is really 
messy as it's not bound to `Expression.resolved`. Changing the rule order does 
fix this simple query, but I'm afraid it's still fragile as rule order is quite 
unreliable.
   
   I'd like to have a more aggresive refactor: let's don't require operands of 
these math operations to be the same decimal type, and we can define the return 
decimal type for each math operation individually. Then the only thing need to 
be done in the type coercion is to cast non-decimal operands to decimal types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang opened a new pull request, #36616: [SPARK-39231][SQL] Change to use `ConstantColumnVector` to store partition columns in `VectorizedParquetRecordReader`

2022-05-19 Thread GitBox


LuciferYang opened a new pull request, #36616:
URL: https://github.com/apache/spark/pull/36616

   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36486: [SPARK-39129][PS] Implement GroupBy.ewm

2022-05-19 Thread GitBox


HyukjinKwon commented on PR #36486:
URL: https://github.com/apache/spark/pull/36486#issuecomment-1132476982

   I haven't taken a close look but seems fine from a cursory look. Should be 
good to go.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #36486: [SPARK-39129][PS] Implement GroupBy.ewm

2022-05-19 Thread GitBox


zhengruifeng commented on PR #36486:
URL: https://github.com/apache/spark/pull/36486#issuecomment-1132454863

   cc @HyukjinKwon @xinrong-databricks @itholic would you mind take a look 
whenyou have some time, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] manuzhang commented on pull request #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss

2022-05-19 Thread GitBox


manuzhang commented on PR #36615:
URL: https://github.com/apache/spark/pull/36615#issuecomment-1132451324

   cc @gengliangwang @cloud-fan @turboFei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] manuzhang opened a new pull request, #36615: [SPARK-39238][SQL] Apply WidenSetOperationTypes at last to fix decimal precision loss

2022-05-19 Thread GitBox


manuzhang opened a new pull request, #36615:
URL: https://github.com/apache/spark/pull/36615

   ### What changes were proposed in this pull request?
   When analyzing, apply WidenSetOperationTypes after other rules.
   
   
   ### Why are the changes needed?
   The following SQL returns 1.00 while 1. is expected 
since union should pick the wider precision, the `Decimal(38,20)` from `v / v` .
   
   ```
   CREATE OR REPLACE TEMPORARY VIEW t3 AS VALUES (decimal(1)) tbl(v);
   CREATE OR REPLACE TEMPORARY VIEW t4 AS SELECT CAST(v AS DECIMAL(18, 2)) AS v 
FROM t3;
   
   SELECT CAST(1 AS DECIMAL(28, 2))
   UNION ALL
   SELECT v / v FROM t4;
   ```
   
   Checking the analyzed logical plan of the above SQL, `Project [cast((v / 
v)#236 as decimal(28,2)) AS (v / v)#237]`  is added by `WidenSetOperationTypes` 
before `DecimalPrecision` promoting precision for the divide. The result of `v 
/ v` is cast to the narrower `decimal(28,2)`.
   
   ```
   == Analyzed Logical Plan ==
   CAST(1 AS DECIMAL(28,2)): decimal(28,2)
   Union false, false
   :- Project [CAST(1 AS DECIMAL(28,2))#235]
   :  +- Project [cast(1 as decimal(28,2)) AS CAST(1 AS DECIMAL(28,2))#235]
   : +- OneRowRelation
   +- Project [cast((v / v)#236 as decimal(28,2)) AS (v / v)#237]
  +- Project [CheckOverflow((promote_precision(cast(v#228 as 
decimal(18,2))) / promote_precision(cast(v#228 as decimal(18,2, 
DecimalType(38,20), false) AS (v / v)#236]
 +- SubqueryAlias t4
+- Project [cast(v#226 as decimal(18,2)) AS v#228]
   +- SubqueryAlias t3
  +- SubqueryAlias tbl
 +- LocalRelation [v#226]
   ```
   
   Hence, I propose to apply `WidenSetOperationTypes` after `DecimalPrecision`.
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Add UT.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


HyukjinKwon commented on PR #36589:
URL: https://github.com/apache/spark/pull/36589#issuecomment-1132433602

   Merged to master and branch-3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


HyukjinKwon closed pull request #36589: [SPARK-39218][SS][PYTHON] Make 
foreachBatch streaming query stop gracefully
URL: https://github.com/apache/spark/pull/36589


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation

2022-05-19 Thread GitBox


gengliangwang commented on PR #36614:
URL: https://github.com/apache/spark/pull/36614#issuecomment-1132430487

   cc @tanvn as well. Thanks for pointing it out!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang opened a new pull request, #36614: [SPARK-39237][DOCS] Update the ANSI SQL mode documentation

2022-05-19 Thread GitBox


gengliangwang opened a new pull request, #36614:
URL: https://github.com/apache/spark/pull/36614

   
   
   ### What changes were proposed in this pull request?
   
   1. Remove the Experimental notation in ANSI SQL compliance doc
   2. Update the description of `spark.sql.ansi.enabled`, since the ANSI 
reversed keyword is disabled by default now
   
   ### Why are the changes needed?
   
   1. The ANSI SQL dialect is GAed in Spark 3.2 release: 
https://spark.apache.org/releases/spark-release-3-2-0.html
   We should not mark it as "Experimental" in the doc.
   2. The ANSI reversed keyword is disabled by default now
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   No, just doc change
   ### How was this patch tested?
   
   Doc preview:
   https://user-images.githubusercontent.com/1097932/169444094-de9c33c2-1b01-4fc3-b583-b752c71e16d8.png";>
   
   https://user-images.githubusercontent.com/1097932/169446841-a945bffe-84eb-45e9-9ebb-501387e216cf.png";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity

2022-05-19 Thread GitBox


dongjoon-hyun commented on PR #36358:
URL: https://github.com/apache/spark/pull/36358#issuecomment-1132427215

   Thank you so much, @zwangsheng .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


amaliujia commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877706031


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   description: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+val idents = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
 val storage = DataSource.buildStorageFormatFromOptions(options)
 val tableType = if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
 }
-val tableDesc = CatalogTable(
-  identifier = tableIdent,
-  tableType = tableType,
-  storage = storage,
-  schema = schema,
-  provider = Some(source),
-  comment = { if (description.isEmpty) None else Some(description) }
-)
-val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+val location = if (storage.locationUri.isDefined) {
+  val locationStr = storage.locationUri.get.toString
+  Some(locationStr)
+} else {
+  None
+}
+
+val tableSpec =
+  TableSpec(
+properties = Map(),
+provider = Some(source),
+options = options,
+location = location,
+comment = { if (description.isEmpty) None else Some(description) },
+serde = None,
+external = tableType == CatalogTableType.EXTERNAL)
+
+val plan =
+  CreateTable(
+name = UnresolvedDBObjectName(idents, isNamespace = true),

Review Comment:
   oops yes it should be false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


amaliujia commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877705884


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
*/
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-val tables = sessionCatalog.listTables(dbName).map(makeTable)
-CatalogImpl.makeDataset(tables, sparkSession)
+if (sessionCatalog.databaseExists(dbName)) {
+  val tables = sessionCatalog.listTables(dbName).map(makeTable)
+  CatalogImpl.makeDataset(tables, sparkSession)
+} else {
+  val multiParts = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+  val plan = ShowTables(UnresolvedNamespace(multiParts), None)
+  val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+  val tables = ret
+.map(row => TableIdentifier(row.getString(1), Some(row.getString(0
+.map(makeTable)

Review Comment:
   oh yes you are right. Let me add a version that looks up through analyzer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zwangsheng closed pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity

2022-05-19 Thread GitBox


zwangsheng closed pull request #36358: [SPARK-39023] [K8s] Add Executor Pod 
inter-pod anti-affinity
URL: https://github.com/apache/spark/pull/36358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zwangsheng commented on pull request #36358: [SPARK-39023] [K8s] Add Executor Pod inter-pod anti-affinity

2022-05-19 Thread GitBox


zwangsheng commented on PR #36358:
URL: https://github.com/apache/spark/pull/36358#issuecomment-1132423502

   > Hi, @zwangsheng . Thank you for making a PR.
   However, Apache Spark community wants to avoid feature duplications like 
this.
   The proposed feature is already delivered to many production environments 
via PodTemplate and has been used by the customers without any problem. Adding 
another configuration only makes the users confused .
   
   @dongjoon-hyun Thanks for your reply. I can understand the above and accept 
it.
   
   Thanks all for review this PR!!!
   
   I will close this PR and look forward to meeting in the another PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a diff in pull request #36162: [SPARK-32170][CORE] Improve the speculation through the stage task metrics.

2022-05-19 Thread GitBox


Ngone51 commented on code in PR #36162:
URL: https://github.com/apache/spark/pull/36162#discussion_r877700761


##
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala:
##
@@ -769,6 +785,25 @@ private[spark] class TaskSetManager(
 }
   }
 
+  def setTaskRecordsAndRunTime(
+  info: TaskInfo,
+  result: DirectTaskResult[_]): Unit = {
+var records = 0L
+var runTime = 0L
+result.accumUpdates.foreach { a =>
+  if (a.name == Some(shuffleRead.RECORDS_READ) ||
+a.name == Some(input.RECORDS_READ)) {
+val acc = a.asInstanceOf[LongAccumulator]
+records += acc.value
+  } else if (a.name == Some(InternalAccumulator.EXECUTOR_RUN_TIME)) {
+val acc = a.asInstanceOf[LongAccumulator]
+runTime = acc.value
+  }
+}
+info.setRecords(records)
+info.setRunTime(runTime)

Review Comment:
   Same here.. I think we centralize the calculation of this stuff into 
`InefficientTaskCalculator`.



##
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala:
##
@@ -863,6 +870,29 @@ private[spark] class TaskSchedulerImpl(
   executorUpdates)
   }
 
+ private def getTaskAccumulableInfosAndProgressRate(
+  updates: Seq[AccumulatorV2[_, _]]): (Seq[AccumulableInfo], Double) = {
+   var records = 0L
+   var runTime = 0L
+   val accInfos = updates.map { acc =>
+ if (calculateTaskProgressRate && acc.name.isDefined) {
+   val name = acc.name.get
+   if (name == shuffleRead.RECORDS_READ || name == input.RECORDS_READ) {
+ records += acc.value.asInstanceOf[Long]
+   } else if (name == InternalAccumulator.EXECUTOR_RUN_TIME) {
+ runTime = acc.value.asInstanceOf[Long]
+   }
+ }
+ acc.toInfo(Some(acc.value), None)
+   }
+   val taskProgressRate = if (calculateTaskProgressRate && runTime > 0) {
+ records / (runTime / 1000.0)
+   } else {
+ 0.0D
+   }

Review Comment:
   Can we centralize the calculation of task progress rate to the 
`InefficientTaskCalculator` only? It seems not each calculation is necessary 
here since the speculation check only happens under certain conditions, e.g., 
`numSuccessfulTasks >= minFinishedForSpeculation`.  And I think we can reuse 
the existing`TaskInfo._accumulables` directly, which could make cold cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #36608: [SPARK-39230][SQL] Support ANSI Aggregate Function: regr_slope

2022-05-19 Thread GitBox


beliefer commented on PR #36608:
URL: https://github.com/apache/spark/pull/36608#issuecomment-1132419433

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental

2022-05-19 Thread GitBox


gengliangwang commented on PR #27590:
URL: https://github.com/apache/spark/pull/27590#issuecomment-1132410903

   @tanvn nice catch!
   @cloud-fan Yes I will update the docs on 3.2 and above


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


HyukjinKwon commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877689895


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   Yeah, maybe we can add that parameter, and document which value in `axis` is 
not supported then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental

2022-05-19 Thread GitBox


cloud-fan commented on PR #27590:
URL: https://github.com/apache/spark/pull/27590#issuecomment-1132399814

   I think we can remove the experimental mark now. What do you think? 
@gengliangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877685094


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   description: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+val idents = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
 val storage = DataSource.buildStorageFormatFromOptions(options)
 val tableType = if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
 }
-val tableDesc = CatalogTable(
-  identifier = tableIdent,
-  tableType = tableType,
-  storage = storage,
-  schema = schema,
-  provider = Some(source),
-  comment = { if (description.isEmpty) None else Some(description) }
-)
-val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+val location = if (storage.locationUri.isDefined) {
+  val locationStr = storage.locationUri.get.toString
+  Some(locationStr)
+} else {
+  None
+}
+
+val tableSpec =
+  TableSpec(
+properties = Map(),
+provider = Some(source),
+options = options,
+location = location,
+comment = { if (description.isEmpty) None else Some(description) },
+serde = None,
+external = tableType == CatalogTableType.EXTERNAL)
+
+val plan =
+  CreateTable(
+name = UnresolvedDBObjectName(idents, isNamespace = true),

Review Comment:
   `isNamespace` should be false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877684804


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   description: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+val idents = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
 val storage = DataSource.buildStorageFormatFromOptions(options)
 val tableType = if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
 }
-val tableDesc = CatalogTable(
-  identifier = tableIdent,
-  tableType = tableType,
-  storage = storage,
-  schema = schema,
-  provider = Some(source),
-  comment = { if (description.isEmpty) None else Some(description) }
-)
-val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+val location = if (storage.locationUri.isDefined) {
+  val locationStr = storage.locationUri.get.toString
+  Some(locationStr)
+} else {
+  None
+}
+
+val tableSpec =
+  TableSpec(
+properties = Map(),
+provider = Some(source),
+options = options,
+location = location,
+comment = { if (description.isEmpty) None else Some(description) },
+serde = None,
+external = tableType == CatalogTableType.EXTERNAL)
+
+val plan =
+  CreateTable(
+name = UnresolvedDBObjectName(idents, isNamespace = true),
+tableSchema = schema,
+partitioning = Seq(),
+tableSpec = tableSpec,
+ignoreIfExists = false)

Review Comment:
   ```suggestion
   val plan = CreateTable(
 name = UnresolvedDBObjectName(idents, isNamespace = true),
 tableSchema = schema,
 partitioning = Seq(),
 tableSpec = tableSpec,
 ignoreIfExists = false)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877684610


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   description: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+val idents = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
 val storage = DataSource.buildStorageFormatFromOptions(options)
 val tableType = if (storage.locationUri.isDefined) {
   CatalogTableType.EXTERNAL
 } else {
   CatalogTableType.MANAGED
 }
-val tableDesc = CatalogTable(
-  identifier = tableIdent,
-  tableType = tableType,
-  storage = storage,
-  schema = schema,
-  provider = Some(source),
-  comment = { if (description.isEmpty) None else Some(description) }
-)
-val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+val location = if (storage.locationUri.isDefined) {
+  val locationStr = storage.locationUri.get.toString
+  Some(locationStr)
+} else {
+  None
+}
+
+val tableSpec =
+  TableSpec(
+properties = Map(),
+provider = Some(source),
+options = options,
+location = location,
+comment = { if (description.isEmpty) None else Some(description) },
+serde = None,
+external = tableType == CatalogTableType.EXTERNAL)

Review Comment:
   ```suggestion
   val tableSpec = TableSpec(
 properties = Map(),
 provider = Some(source),
 options = options,
 location = location,
 comment = { if (description.isEmpty) None else Some(description) },
 serde = None,
 external = tableType == CatalogTableType.EXTERNAL)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


ulysses-you commented on PR #34785:
URL: https://github.com/apache/spark/pull/34785#issuecomment-1132397474

   Looks correct to me. BTW, after Spark3.3 the RebalancePartitions supports 
specify the initialNumPartition, so the demo code can be:
   ```scala
   val optNumPartitions = if (numPartitions == 0) None else Some(numPartitions)
   if (!write.distributionStrictlyRequired()) {
 RebalancePartitions(distribution, query, optNumPartitions)
   } else {
 RepartitionByExpression(distribution, query, optNumPartitions)
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877684350


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -367,24 +377,40 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   schema: StructType,
   description: String,
   options: Map[String, String]): DataFrame = {
-val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+val idents = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)

Review Comment:
   ```suggestion
   val ident = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877684235


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
*/
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-val tables = sessionCatalog.listTables(dbName).map(makeTable)
-CatalogImpl.makeDataset(tables, sparkSession)
+if (sessionCatalog.databaseExists(dbName)) {
+  val tables = sessionCatalog.listTables(dbName).map(makeTable)
+  CatalogImpl.makeDataset(tables, sparkSession)
+} else {
+  val multiParts = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+  val plan = ShowTables(UnresolvedNamespace(multiParts), None)
+  val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+  val tables = ret
+.map(row => TableIdentifier(row.getString(1), Some(row.getString(0
+.map(makeTable)

Review Comment:
   `makeTable` only looks up table from hive metastore. I think we need a new 
`makeTable` which takes `Seq[String]` and looks up table through analyzer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r877682958


##
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala:
##
@@ -97,8 +97,18 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
*/
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-val tables = sessionCatalog.listTables(dbName).map(makeTable)
-CatalogImpl.makeDataset(tables, sparkSession)
+if (sessionCatalog.databaseExists(dbName)) {
+  val tables = sessionCatalog.listTables(dbName).map(makeTable)
+  CatalogImpl.makeDataset(tables, sparkSession)
+} else {
+  val multiParts = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)

Review Comment:
   ```suggestion
 val ident = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


Yikun commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877673383


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   And the upstream pandas's `axis` is also a dummy parameter (this is really 
special). So we just keep same with pandas doc/parameters is enough. After 
this, the missing parameters would be remove is right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


Yikun commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877669800


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   
![image](https://user-images.githubusercontent.com/1736354/169432658-ab51f8eb-2014-4d6b-b95d-67e78e077029.png)
   
   It would be listed, if we don't add it.
   
   ```python
   >>> from inspect import signature
   >>> signature(pd.Series.argmax).parameters
   mappingproxy(OrderedDict([('self', ), ('axis', ), ('skipna', ), ('args', 
), ('kwargs', )]))
   >>> signature(ps.Series.argmax).parameters
   mappingproxy(OrderedDict([('self', ), ('skipna', 
)]))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`

2022-05-19 Thread GitBox


LuciferYang commented on code in PR #36611:
URL: https://github.com/apache/spark/pull/36611#discussion_r877674754


##
core/src/main/scala/org/apache/spark/util/Utils.scala:
##
@@ -308,28 +308,7 @@ private[spark] object Utils extends Logging {
* newly created, and is not marked for automatic deletion.
*/
   def createDirectory(root: String, namePrefix: String = "spark"): File = {
-var attempts = 0
-val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
-var dir: File = null
-while (dir == null) {
-  attempts += 1
-  if (attempts > maxAttempts) {
-throw new IOException("Failed to create a temp directory (under " + 
root + ") after " +
-  maxAttempts + " attempts!")
-  }
-  try {
-dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
-// SPARK-35907:
-// This could throw more meaningful exception information if directory 
creation failed.
-Files.createDirectories(dir.toPath)
-  } catch {
-case e @ (_ : IOException | _ : SecurityException) =>
-  logError(s"Failed to create directory $dir", e)
-  dir = null
-  }
-}
-
-dir.getCanonicalFile
+JavaUtils.createDirectory(root, namePrefix)

Review Comment:
   
https://github.com/apache/spark/blob/5ee6f72744143cc5e19aa058df209f7156e51cee/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L399-L419



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`

2022-05-19 Thread GitBox


LuciferYang commented on code in PR #36611:
URL: https://github.com/apache/spark/pull/36611#discussion_r877674586


##
core/src/main/scala/org/apache/spark/util/Utils.scala:
##
@@ -339,9 +318,7 @@ private[spark] object Utils extends Logging {
   def createTempDir(
   root: String = System.getProperty("java.io.tmpdir"),
   namePrefix: String = "spark"): File = {
-val dir = createDirectory(root, namePrefix)
-ShutdownHookManager.registerShutdownDeleteDir(dir)
-dir
+JavaUtils.createTempDir(root, namePrefix)

Review Comment:
   
https://github.com/apache/spark/blob/5ee6f72744143cc5e19aa058df209f7156e51cee/common/network-common/src/main/java/org/apache/spark/network/util/JavaUtils.java#L379-L385



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn commented on pull request #36592: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab

2022-05-19 Thread GitBox


yaooqinn commented on PR #36592:
URL: https://github.com/apache/spark/pull/36592#issuecomment-1132373620

   thanks, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] yaooqinn closed pull request #36592: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab

2022-05-19 Thread GitBox


yaooqinn closed pull request #36592: [SPARK-39221][SQL] Make sensitive 
information be redacted correctly for thrift server job/stage tab
URL: https://github.com/apache/spark/pull/36592


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #36611: [SPARK-39204][CORE] Change `Utils.createTempDir` and `Utils.createDirectory` call the same logic method in `JavaUtils`

2022-05-19 Thread GitBox


LuciferYang commented on PR #36611:
URL: https://github.com/apache/spark/pull/36611#issuecomment-1132373369

   > Yeah, I think we should better fix `Utils.createTempDir`.
   
   Yeah ~ now this pr only change one file and achieved the goal


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


Yikun commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877673383


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   And the upstream pandas's `axis` is also a dummy parameter (this is really 
special). So we just keep same with pandas doc/parameters is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Yikun commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


Yikun commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877669800


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   
![image](https://user-images.githubusercontent.com/1736354/169432658-ab51f8eb-2014-4d6b-b95d-67e78e077029.png)
   
   It would be listed.
   
   ```python
   >>> from inspect import signature
   >>> signature(pd.Series.argmax).parameters
   mappingproxy(OrderedDict([('self', ), ('axis', ), ('skipna', ), ('args', 
), ('kwargs', )]))
   >>> signature(ps.Series.argmax).parameters
   mappingproxy(OrderedDict([('self', ), ('skipna', 
)]))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


huaxingao commented on PR #34785:
URL: https://github.com/apache/spark/pull/34785#issuecomment-1132364552

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36599: [SPARK-39228][PYTHON][PS] Implement `skipna` of `Series.argmax`

2022-05-19 Thread GitBox


HyukjinKwon commented on code in PR #36599:
URL: https://github.com/apache/spark/pull/36599#discussion_r877664483


##
python/pyspark/pandas/series.py:
##
@@ -6239,13 +6239,19 @@ def argsort(self) -> "Series":
 ps.concat([psser, self.loc[self.isnull()].spark.transform(lambda 
_: SF.lit(-1))]),
 )
 
-def argmax(self) -> int:
+def argmax(self, skipna: bool = True) -> int:

Review Comment:
   Good point. But I guess it won't be listed in the missing parameters in the 
documentation (?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


huaxingao commented on PR #34785:
URL: https://github.com/apache/spark/pull/34785#issuecomment-1132363307

   Thanks @aokolnychyi for the proposal. I agree that we should support both 
strictly required distribution and best effort distribution. For best effort 
distribution, if user doesn't request a specific number of partitions, we will 
split skewed partitions and coalesce small partitions. For strictly required 
distribution, if user doesn't request a specific number of partitions, we will 
coalesce small partitions but we will NOT split skewed partitions since this 
changes the required distribution.
   
   In interface `RequiresDistributionAndOrdering`, I will add
   ```
   default boolean distributionStrictlyRequired() { return true; }
   ```
   Then in `DistributionAndOrderingUtils`.`prepareQuery`, I will change the 
code to something like this:
   ```  
 val queryWithDistribution = if (distribution.nonEmpty) {
   if (!write.distributionStrictlyRequired() && numPartitions == 0) {
 RebalancePartitions(distribution, query)
   } else {
 if (numPartitions > 0) {
   RepartitionByExpression(distribution, query, numPartitions)
 } else {
   RepartitionByExpression(distribution, query, None)
 }
   }
   ...
   ``` 
   Basically, in the best effort case, if the requested numPartitions is 0, we 
will use `RebalancePartitions` so both `OptimizeSkewInRebalancePartitions` and 
`CoalesceShufflePartitions` will be applied. In the strictly required 
distribution case,  if the requested numPartitions is 0, we will use 
`RepartitionByExpression(distribution, query, None)` so 
`CoalesceShufflePartitions` will be applied. 
   
   Does this sound correct for every one?
   
   
   
   
   
   
   
   
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #36330: [SPARK-38897][SQL] DS V2 supports push down string functions

2022-05-19 Thread GitBox


beliefer commented on code in PR #36330:
URL: https://github.com/apache/spark/pull/36330#discussion_r877653817


##
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/V2ExpressionSQLBuilder.java:
##
@@ -228,4 +244,18 @@ protected String visitSQLFunction(String funcName, 
String[] inputs) {
   protected String visitUnexpectedExpr(Expression expr) throws 
IllegalArgumentException {
 throw new IllegalArgumentException("Unexpected V2 expression: " + expr);
   }
+
+  protected String visitOverlay(String[] inputs) {
+throw new UnsupportedOperationException("Function: OVERLAY does not 
support ");

Review Comment:
   @chenzhx The default just used to display. It should return `OVERLAY` syntax.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


HyukjinKwon commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r877653141


##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)

Review Comment:
   Yeah, that should work too.



##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)

Review Comment:
   Let me fix 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zsxwing commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


zsxwing commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r877648746


##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)

Review Comment:
   I see. `batch_df.count()` probably will touch some code that can be 
interrupted. Can we use `self.spark._jvm.java.lang.Thread.sleep(1)` to save 
5 seconds in this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


HyukjinKwon commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r877647489


##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)

Review Comment:
   Actually, `batch_df.count()` below triggers it via accessing to the 
interrupted Java thread if I'm not mistaken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zsxwing commented on a diff in pull request #36589: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread GitBox


zsxwing commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r877642301


##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)
+batch_df.count()
+
+q = 
self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
+time.sleep(5)
+q.stop()
+time.sleep(15)  # Wait enough for the exception to be propagated if 
exists.

Review Comment:
   this is not needed. `q.stop()` will wait until the streaming thread is dead.



##
python/pyspark/sql/tests/test_streaming.py:
##
@@ -592,6 +592,18 @@ def collectBatch(df, id):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+time.sleep(10)

Review Comment:
   How does this func trigger InterruptedException? I would expect codes like 
`self.spark._jvm.java.lang.Thread.sleep(1)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36611: [SPARK-39204][BUILD][CORE][SQL][DSTREAM][GRAPHX][K8S][ML][MLLIB][SS][YARN][EXAMPLES][SHELL] Replace `Utils.createTempDir` with `JavaUtils

2022-05-19 Thread GitBox


HyukjinKwon commented on PR #36611:
URL: https://github.com/apache/spark/pull/36611#issuecomment-1132340311

   Yeah, I think we should better fix `Utils.createTempDir`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


github-actions[bot] closed pull request #34785: [SPARK-37523][SQL] Support 
optimize skewed partitions in Distribution and Ordering if numPartitions is not 
specified
URL: https://github.com/apache/spark/pull/34785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #35049: [SPARK-37757][BUILD] Enable Spark test scheduled job on ARM runner

2022-05-19 Thread GitBox


github-actions[bot] closed pull request #35049: [SPARK-37757][BUILD] Enable 
Spark test scheduled job on ARM runner
URL: https://github.com/apache/spark/pull/35049


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #35402: [SPARK-37536][SQL] Allow for API user to disable Shuffle on Local Mode

2022-05-19 Thread GitBox


github-actions[bot] closed pull request #35402: [SPARK-37536][SQL] Allow for 
API user to disable Shuffle on Local Mode
URL: https://github.com/apache/spark/pull/35402


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #35424: [WIP][SPARK-38116] Add auto commit option to JDBC PostgreSQL driver and set the option false default

2022-05-19 Thread GitBox


github-actions[bot] commented on PR #35424:
URL: https://github.com/apache/spark/pull/35424#issuecomment-1132318749

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes

2022-05-19 Thread GitBox


dongjoon-hyun commented on PR #36004:
URL: https://github.com/apache/spark/pull/36004#issuecomment-1132318738

   Thank you, @eejbyfeldt , @cloud-fan , @srowen !
   
   cc @MaxGekk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes

2022-05-19 Thread GitBox


srowen commented on PR #36004:
URL: https://github.com/apache/spark/pull/36004#issuecomment-1132316150

   Merged to master/3.3


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen closed pull request #36004: [SPARK-38681][SQL] Support nested generic case classes

2022-05-19 Thread GitBox


srowen closed pull request #36004: [SPARK-38681][SQL] Support nested generic 
case classes
URL: https://github.com/apache/spark/pull/36004


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36004: [SPARK-38681][SQL] Support nested generic case classes

2022-05-19 Thread GitBox


dongjoon-hyun commented on PR #36004:
URL: https://github.com/apache/spark/pull/36004#issuecomment-1132295581

   Thank you, @eejbyfeldt .
   
   cc @srowen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hai-tao-1 commented on pull request #36606: [SPARK-39232][CORE] History Server Main Page App List Filtering

2022-05-19 Thread GitBox


hai-tao-1 commented on PR #36606:
URL: https://github.com/apache/spark/pull/36606#issuecomment-1132271280

   The PR test fails with ```[error] spark-core: Failed binary compatibility 
check against org.apache.spark:spark-core_2.12:3.2.0! Found 9 potential 
problems (filtered 924)```. Anyone could advise what may be wrong? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hai-tao-1 commented on pull request #36606: [SPARK-39232][CORE] History Server Main Page App List Filtering

2022-05-19 Thread GitBox


hai-tao-1 commented on PR #36606:
URL: https://github.com/apache/spark/pull/36606#issuecomment-1132271279

   The PR test fails with ```[error] spark-core: Failed binary compatibility 
check against org.apache.spark:spark-core_2.12:3.2.0! Found 9 potential 
problems (filtered 924)```. Anyone could advise what may be wrong? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`

2022-05-19 Thread GitBox


dongjoon-hyun commented on PR #36597:
URL: https://github.com/apache/spark/pull/36597#issuecomment-1132159846

   Merged to master. I added you to the Apache Spark contributor group and 
assigned SPARK-39225 to you, @hai-tao-1 .
   Welcome to the Apache Spark community.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun closed pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`

2022-05-19 Thread GitBox


dongjoon-hyun closed pull request #36597: [SPARK-39225][CORE] Support 
`spark.history.fs.update.batchSize`
URL: https://github.com/apache/spark/pull/36597


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-05-19 Thread GitBox


amaliujia commented on PR #36586:
URL: https://github.com/apache/spark/pull/36586#issuecomment-1132121981

   R: @cloud-fan this PR is ready to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] huaxingao opened a new pull request, #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


huaxingao opened a new pull request, #34785:
URL: https://github.com/apache/spark/pull/34785

   
   ### What changes were proposed in this pull request?
   Support optimize skewed partitions in Distribution and Ordering if 
numPartitions is not specified
   
   ### Why are the changes needed?
   When doing repartition in distribution and sort, we will use Rebalance 
operator instead of RepartitionByExpression to optimize skewed partitions when
   1. numPartitions is not specified by the data source, and
   2. sortOrder is specified. This is because the requested distribution needs 
to be guaranteed, which can only be achieved by using RangePartitioning, not 
HashPartitioning.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Existing and new tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to

2022-05-19 Thread GitBox


otterc commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877366840


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// there is a FetchFailed event and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null) {
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, None)
+}

Review Comment:
   Can you also check line 2031 where `removeExecutorAndUnregisterOutputs` is 
called. The `executorId` for ShuffleMergedChunk is `shuffle-push-merger`. Will 
that cause any issues? We should probably add a UT for this as well with the 
`unRegisterOutputOnHostOnFetchFailure` enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on pull request #34785: [SPARK-37523][SQL] Support optimize skewed partitions in Distribution and Ordering if numPartitions is not specified

2022-05-19 Thread GitBox


aokolnychyi commented on PR #34785:
URL: https://github.com/apache/spark/pull/34785#issuecomment-1132014116

   Thanks for the PR, @huaxingao. I think it is a great feature and it would be 
awesome to get it done.
   
   I spent some time thinking about this and have a few questions/proposals.
   
   If I understand correctly, we currently hard-code the number of shuffle 
partitions in `RepartitionByExpression`, which prohibits both coalescing and 
skew split optimizations.
   
   It seems reasonable to support cases when the requested distribution is 
best-effort but I also think there are valid cases when the distribution is 
required for correctness and it is actually the current API contract. What 
about extending `RequiredDistributionAndOrdering` to indicate the distribution 
is not strictly required? We can add some boolean method and default it to keep 
the existing behavior. If the distribution is required, we can still benefit 
from coalescing as I think `CoalesceShufflePartitions` and `AQEShuffleReadExec` 
would keep the original distribution in coalesce cases. That’s already a huge 
win. We can avoid too small files while keeping the requested distribution.
   
   I also agree about using `RebalancePartitions` when the distribution is not 
strictly required. What about extending `RebalancePartitions` to also support 
range partitioning? It currently supports only hash and round-robin. If we make 
that change, we will be able to remove unnecessary shuffles in the optimizer 
and keep the original distribution as long as there is no skew and we only 
coalesce. If there is a skew, an extra shuffle and changed distribution seems 
like a reasonable overhead.
   
   What does everybody else think?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t

2022-05-19 Thread GitBox


mridulm commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877361726


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {

Review Comment:
   I am trying to understand, can we have a case where `mapIndex == -1` and 
`bmAddress != null` for non-push based shuffle ? If no, we should add an 
assertion there (not drop the check entirely).
   
   Note, L1882 has the check because the earlier check is for `mapIndex != -1` 
- which can be satisfied for all shuffles



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36603: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file

2022-05-19 Thread GitBox


MaxGekk commented on PR #36603:
URL: https://github.com/apache/spark/pull/36603#issuecomment-1132003245

   @panbingkun Could you backport this to branch-3.3, please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36603: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file

2022-05-19 Thread GitBox


MaxGekk closed pull request #36603: [SPARK-39163][SQL] Throw an exception w/ 
error class for an invalid bucket file
URL: https://github.com/apache/spark/pull/36603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to

2022-05-19 Thread GitBox


otterc commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877340985


##
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala:
##
@@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
   ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
   }
 
+  test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " +

Review Comment:
   Nit: modify this name so it's clear that when corruption goes undetected 
then it should throw fetch failure. Otherwise, it is confusing because one test 
says we should fallback on corruption and the other says it should throw fetch 
failure



##
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##
@@ -1166,6 +1166,9 @@ final class ShuffleBlockFetcherIterator(
   case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
 throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, 
mapIndex, startReduceId,
   msg, e)
+  case ShuffleBlockChunkId(shuffleId, _, reduceId, _) =>
+SparkCoreErrors.fetchFailedError(address, shuffleId,
+  -1L, SHUFFLE_PUSH_MAP_ID, reduceId, msg, e)

Review Comment:
   Nit: Can we use SHUFFLE_PUSH_MAP for mapId as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hai-tao-1 commented on pull request #36597: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`

2022-05-19 Thread GitBox


hai-tao-1 commented on PR #36597:
URL: https://github.com/apache/spark/pull/36597#issuecomment-1131988355

   > Thank you for updates, @hai-tao-1 . Yes, the only remaining comment is the 
test case.
   > 
   > > We need a test case for the configuration. Please check the corner cases 
especially.
   
   @dongjoon-hyun Added unit test  test("SPARK-39225: Support 
spark.history.fs.update.batchSize").


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to

2022-05-19 Thread GitBox


otterc commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877329983


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {

Review Comment:
   I think we should check for `pushBasedShuffleEnabled` here just for 
consistency with lines 1882-1886. There aren't going to be any merge results if 
push-based shuffle is not enabled but since we do it at the other places we 
should do it here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #36611: [SPARK-39204][BUILD][CORE][SQL][DSTREAM][GRAPHX][K8S][ML][MLLIB][SS][YARN][EXAMPLES][SHELL] Replace `Utils.createTempDir` with `JavaUtils

2022-05-19 Thread GitBox


LuciferYang commented on PR #36611:
URL: https://github.com/apache/spark/pull/36611#issuecomment-1131979146

   It seems that this change is  big. Another way to keep one  `createTempDir` 
is to let `Utils.createTempDir` call `JavaUtils.createTempDir`   . Is this 
acceptable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se

2022-05-19 Thread GitBox


akpatnam25 commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877316473


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {

Review Comment:
   we dont need that check here, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] akpatnam25 commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se

2022-05-19 Thread GitBox


akpatnam25 commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877316296


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))

Review Comment:
   yep, passing in `None` now. this way we do not have to explicitly pass in -1 
in the tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] nkronenfeld opened a new pull request, #36613: [WIP][SPARK-30983] Support typed select in Datasets up to the max tuple size

2022-05-19 Thread GitBox


nkronenfeld opened a new pull request, #36613:
URL: https://github.com/apache/spark/pull/36613

   ### What changes were proposed in this pull request?
   
   This PR simply adds typed select methods to Dataset up to the max Tuple size 
of 22.
   
   This has been bugging me for years, so I finally decided to get off my 
backside and do something about it :-).
   
   As noted in the JIRA issue, technically, this is a breaking change - indeed, 
I had to remove an old test that specifically tested that Spark didn't support 
typed select for tuples larger than 5.  However, it would take someone 
explicitly relying on select returning a DataFrame instead of a Dataset when 
using select on large tuples of typed columns (though I guess that test I had 
to remove exhibits one case where this may happen).
   
   I've set the PR as WIP because I've been unable to run all tests so far - 
not due to the fix, but rather due to not having things set up correctly on my 
computer.  Still working on that.
   
   ### Why are the changes needed?
   Arbitrarily supporting only up to 5-tuples is weird and unpredictable.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes, select on tuples of all typed columns larger than 5 will now return a 
Dataset instead of a DataFrame
   
   ### How was this patch tested?
   I've run all sql tests, and they all pass (though testing itself still fails 
on my machine, I think with a path-too-long error
   I've added a test to make sure the typed select works on all sizes - mostly 
this is a compile issue, not a run-time issue, but I checked values too, just 
to double-check that I didn't miss anything (which is a big potential problem 
with long tuples and copy-paste errors)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vli-databricks commented on pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function

2022-05-19 Thread GitBox


vli-databricks commented on PR #36584:
URL: https://github.com/apache/spark/pull/36584#issuecomment-1131948634

   Yes, the purpose is ease of migration, removed change to `functions.scala` 
to limit scope to Spark SQL only.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36584: [SPARK-39213][SQL] Create ANY_VALUE aggregate function

2022-05-19 Thread GitBox


MaxGekk commented on PR #36584:
URL: https://github.com/apache/spark/pull/36584#issuecomment-1131939361

   How about to add the function to other APIs like first() in
   - PySpark: 
https://github.com/apache/spark/blob/b63674ea5f746306a96ab8c39c23a230a6cb9566/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L500
   - R: 
https://github.com/apache/spark/blob/16d1c68e8b185457ae86a248d0874e61c3bc6f3a/R/pkg/R/functions.R#L1178
   
   BTW, if the purpose of this new feature is to make migrations to Spark SQL 
from other systems easier, I would propose to add it to Spark SQL only (and not 
extend functions.scala). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t

2022-05-19 Thread GitBox


mridulm commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877275914


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {
+  mapOutputTracker.
+unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))

Review Comment:
   Given `mapIndex` is -1, should this not be `None` passed in to 
`unregisterMergeResult` instead of `Option(-1)` ?
   The tests are passing because you explicitly added `-1` to the bitmap, which 
looks incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t

2022-05-19 Thread GitBox


mridulm commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877273610


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,14 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if
+// the FetchFailed event contains a mapIndex of -1, and is not a
+// MetaDataFetchException which is signified by bmAddress being 
null
+if (bmAddress != null && pushBasedShuffleEnabled) {

Review Comment:
   Why do we need `pushBasedShuffleEnabled` check here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #36580: [SPARK-39167][SQL] Throw an exception w/ an error class for multiple rows from a subquery used as an expression

2022-05-19 Thread GitBox


MaxGekk commented on code in PR #36580:
URL: https://github.com/apache/spark/pull/36580#discussion_r877269334


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##
@@ -1971,4 +1971,10 @@ object QueryExecutionErrors extends QueryErrorsBase {
 s"add ${toSQLValue(amount, IntegerType)} $unit to " +
 s"${toSQLValue(DateTimeUtils.microsToInstant(micros), 
TimestampType)}"))
   }
+
+  def multipleRowSubqueryError(plan: String): Throwable = {
+new SparkIllegalStateException(

Review Comment:
   Please, use SparkException or SparkRuntimeException. I removed the the 
exception by https://github.com/apache/spark/pull/36550.
   
   BTW, we shouldn't throw `IllegalStateException` if the exception can be 
trigger by user code in regular cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36612: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage

2022-05-19 Thread GitBox


MaxGekk closed pull request #36612: [SPARK-39234][SQL] Code clean up in 
SparkThrowableHelper.getMessage
URL: https://github.com/apache/spark/pull/36612


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36612: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage

2022-05-19 Thread GitBox


MaxGekk commented on PR #36612:
URL: https://github.com/apache/spark/pull/36612#issuecomment-1131917484

   +1, LGTM. Merging to master.
   Thank you, @gengliangwang and @cloud-fan for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vli-databricks commented on pull request #36584: [SPARK-39213] Create ANY_VALUE aggregate function

2022-05-19 Thread GitBox


vli-databricks commented on PR #36584:
URL: https://github.com/apache/spark/pull/36584#issuecomment-1131915487

   @MaxGekk please review and help me merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36601: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t

2022-05-19 Thread GitBox


mridulm commented on code in PR #36601:
URL: https://github.com/apache/spark/pull/36601#discussion_r877251054


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4342,6 +4342,56 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 assertDataStructuresEmpty()
   }
 
+  test("SPARK-38987: corrupted shuffle block FetchFailure should unregister 
merge result") {
+initPushBasedShuffleConfs(conf)
+DAGSchedulerSuite.clearMergerLocs()
+DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+
+scheduler = new MyDAGScheduler(
+  sc,
+  taskScheduler,
+  sc.listenerBus,
+  mapOutputTracker,
+  blockManagerMaster,
+  sc.env,
+  shuffleMergeFinalize = false,
+  shuffleMergeRegister = false)
+dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
+
+val parts = 2
+val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+// Submit a reduce job that depends which will create a map stage
+submit(reduceRdd, (0 until parts).toArray)
+
+// Pass in custom bitmap so that the mergeStatus can store
+// the correct mapIndex.
+val bitmap = new RoaringBitmap()
+bitmap.add(-1)
+
+val shuffleMapStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+  Seq((0, MergeStatus(makeBlockManagerId("hostA"), 
shuffleDep.shuffleMergeId, bitmap, 1000L
+scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+  shuffleMapStage.shuffleDep.shuffleMergeId)
+scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+  Seq((1, MergeStatus(makeBlockManagerId("hostA"), 
shuffleDep.shuffleMergeId, bitmap, 1000L
+

Review Comment:
   You are right, we need the bitmap to be valid, and not mocked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] tanvn commented on pull request #27590: [SPARK-30703][SQL][DOCS][FollowUp] Declare the ANSI SQL compliance options as experimental

2022-05-19 Thread GitBox


tanvn commented on PR #27590:
URL: https://github.com/apache/spark/pull/27590#issuecomment-1131855914

   @gengliangwang @dongjoon-hyun 
   Hi, I have a question.
   In Spark 3.2.1, are `spark.sql.ansi.enabled` and 
`spark.sql.storeAssignmentPolicy` still considered as experimental options ?
   I understand that there is an epic named `ANSI enhancements in Spark 3.3`  
https://issues.apache.org/jira/browse/SPARK-38860 which means there will be new 
features for the  `spark.sql.ansi.enabled`.
   But as in https://spark.apache.org/releases/spark-release-3-2-0.html, ANSI 
SQL mode GA ([SPARK-35030](https://issues.apache.org/jira/browse/SPARK-35030)) 
is mentioned in the Highlights section, so I think it is not `experimental` 
anymore. Could you please give me your opinion?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36377: [SPARK-39043][SQL] Spark SQL Hive client should not gather statistic by default.

2022-05-19 Thread GitBox


dongjoon-hyun commented on PR #36377:
URL: https://github.com/apache/spark/pull/36377#issuecomment-1131851884

   Thank you for the reverting decision, @cloud-fan and @AngersZh .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan closed pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators

2022-05-19 Thread GitBox


cloud-fan closed pull request #35850: [SPARK-38529][SQL] Prevent 
GeneratorNestedColumnAliasing to be applied to non-Explode generators
URL: https://github.com/apache/spark/pull/35850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators

2022-05-19 Thread GitBox


cloud-fan commented on PR #35850:
URL: https://github.com/apache/spark/pull/35850#issuecomment-1131827928

   thanks, merging to master/3.3!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #35850: [SPARK-38529][SQL] Prevent GeneratorNestedColumnAliasing to be applied to non-Explode generators

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #35850:
URL: https://github.com/apache/spark/pull/35850#discussion_r877169574


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala:
##
@@ -321,6 +321,38 @@ object GeneratorNestedColumnAliasing {
 // need to prune nested columns through Project and under Generate. The 
difference is
 // when `nestedSchemaPruningEnabled` is on, nested columns will be pruned 
further at
 // file format readers if it is supported.
+
+// There are [[ExtractValue]] expressions on or not on the output of the 
generator. Generator
+// can also have different types:
+// 1. For [[ExtractValue]]s not on the output of the generator, 
theoretically speaking, there
+//lots of expressions that we can push down, including non 
ExtractValues and GetArrayItem
+//and GetMapValue. But to be safe, we only handle GetStructField and 
GetArrayStructFields.
+// 2. For [[ExtractValue]]s on the output of the generator, the situation 
depends on the type
+//of the generator expression. *For now, we only support Explode*.
+//   2.1 Inline
+//   Inline takes an input of ARRAY>, and 
returns an output of
+//   STRUCT, the output field can be directly accessed 
by name "field1".
+//   In this case, we should not try to push down the ExtractValue 
expressions to the
+//   input of the Inline. For example:
+//   Project[field1.x AS x]
+//   - Generate[ARRAY, field2:int>>, 
..., field1, field2]
+//   It is incorrect to push down the .x to the input of the Inline.
+//   A valid field pruning would be to extract all the fields that are 
accessed by the
+//   Project, and manually reconstruct an expression using those 
fields.
+//   2.2 Explode
+//   Explode takes an input of ARRAY and returns an output 
of
+//   STRUCT. The default field name "col" can be 
overwritten.
+//   If the input is MAP, it returns STRUCT.
+//   For the array case, it is only valid to push down GetStructField. 
After push down,
+//   the GetStructField becomes a GetArrayStructFields. Note that we 
cannot push down
+//   GetArrayStructFields, since the pushed down expression will 
operate on an array of
+//   array which is invalid.
+//   2.3 Stack
+//   Stack takes a sequence of expressions, and returns an output of
+//   STRUCT
+//   The push down is doable but more complicated in this case as the 
expression that
+//   operates on the col_i of the output needs to pushed down to every 
(kn+i)-th input
+//   expression where n is the total number of columns (or struct 
fields) of the output.

Review Comment:
   actually, I find it's useful to understand why we only support explode today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r877141680


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df5, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df5, "PushedFilters: [], ")
+checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, 
PushedOffset: OFFSET 1,")

Review Comment:
   This does not match 
https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26
   
   Assume that I have a local array data source. According to the API doc, 
Spark pushes down LIMIT first. For this query, I'll do `array.take(1).drop(1)`. 
This is wrong and doesn't match the query `df.limit(2).offset(1)`.
   
   we should either fix the API doc, or fix the pushdown logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36295:
URL: https://github.com/apache/spark/pull/36295#discussion_r877141680


##
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala:
##
@@ -203,6 +204,245 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
 checkAnswer(df5, Seq(Row(1.00, 1000.0, "amy")))
   }
 
+  private def checkOffsetRemoved(df: DataFrame, removed: Boolean = true): Unit 
= {
+val offsets = df.queryExecution.optimizedPlan.collect {
+  case offset: Offset => offset
+}
+if (removed) {
+  assert(offsets.isEmpty)
+} else {
+  assert(offsets.nonEmpty)
+}
+  }
+
+  test("simple scan with OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedOffset: OFFSET 1,")
+checkAnswer(df1, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df2 = spark.read
+  .option("pushDownOffset", "false")
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .offset(1)
+checkOffsetRemoved(df2, false)
+checkPushedInfo(df2,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], ReadSchema:")
+checkAnswer(df2, Seq(Row(1, "cathy", 9000.00, 1200.0, false)))
+
+val df3 = spark.read
+  .option("partitionColumn", "dept")
+  .option("lowerBound", "0")
+  .option("upperBound", "2")
+  .option("numPartitions", "2")
+  .table("h2.test.employee")
+  .filter($"dept" > 1)
+  .offset(1)
+checkOffsetRemoved(df3, false)
+checkPushedInfo(df3, "PushedFilters: [DEPT IS NOT NULL, DEPT > 1], 
ReadSchema:")
+checkAnswer(df3, Seq(Row(2, "david", 1, 1300, true), Row(6, "jen", 
12000, 1200, true)))
+
+val df4 = spark.read
+  .table("h2.test.employee")
+  .groupBy("DEPT").sum("SALARY")
+  .offset(1)
+checkOffsetRemoved(df4, false)
+checkPushedInfo(df4,
+  "PushedAggregates: [SUM(SALARY)], PushedFilters: [], 
PushedGroupByExpressions: [DEPT], ")
+checkAnswer(df4, Seq(Row(2, 22000.00), Row(6, 12000.00)))
+
+val name = udf { (x: String) => x.matches("cat|dav|amy") }
+val sub = udf { (x: String) => x.substring(0, 3) }
+val df5 = spark.read
+  .table("h2.test.employee")
+  .select($"SALARY", $"BONUS", sub($"NAME").as("shortName"))
+  .filter(name($"shortName"))
+  .offset(1)
+checkOffsetRemoved(df5, false)
+// OFFSET is pushed down only if all the filters are pushed down
+checkPushedInfo(df5, "PushedFilters: [], ")
+checkAnswer(df5, Seq(Row(1.00, 1300.0, "dav"), Row(9000.00, 1200.0, 
"cat")))
+  }
+
+  test("simple scan with LIMIT and OFFSET") {
+val df1 = spark.read
+  .table("h2.test.employee")
+  .where($"dept" === 1)
+  .limit(2)
+  .offset(1)
+checkLimitRemoved(df1)
+checkOffsetRemoved(df1)
+checkPushedInfo(df1,
+  "PushedFilters: [DEPT IS NOT NULL, DEPT = 1], PushedLimit: LIMIT 1, 
PushedOffset: OFFSET 1,")

Review Comment:
   This does not match 
https://github.com/apache/spark/pull/36295/files#diff-85c754089fc8e0db142a16714e92b127001bab9e6433684d1e3a15af04cb219aR26
   
   Assume that I have a local array data source. According to the API doc, 
Spark pushes down LIMIT first. For this query, I'll do 
`array.limit(1).drop(1)`. This is wrong and doesn't match the query 
`df.limit(2).offset(1)`.
   
   we should either fix the API doc, or fix the pushdown logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36593:
URL: https://github.com/apache/spark/pull/36593#discussion_r877123725


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCCatalog.scala:
##
@@ -32,11 +35,14 @@ import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with 
Logging {
+class JDBCCatalog extends TableCatalog with SupportsNamespaces with 
FunctionCatalog with Logging {
   private var catalogName: String = null
   private var options: JDBCOptions = _
   private var dialect: JdbcDialect = _
 
+  private val functions: util.Map[Identifier, UnboundFunction] =
+new ConcurrentHashMap[Identifier, UnboundFunction]()

Review Comment:
   We should clearly define how can this be used. I thought each JDBC dialect 
should have APIs to register its own UDFs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36593:
URL: https://github.com/apache/spark/pull/36593#discussion_r877113355


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##
@@ -201,6 +203,14 @@ class V2ExpressionBuilder(
 None
   }
 // TODO supports other expressions
+case ApplyFunctionExpression(function, children) =>
+  val childrenExpressions = children.flatMap(generateExpression(_))
+  if (childrenExpressions.length == children.length) {
+Some(new 
GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT),

Review Comment:
   UDF is kind of special and I think it's better to have a new v2 API for it, 
instead of reusing `GeneralScalarExpression`.
   
   e.g., we can add a `UserDefinedScalaFunction`, which defines the function 
name, canonical name and inputs.



##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##
@@ -201,6 +203,14 @@ class V2ExpressionBuilder(
 None
   }
 // TODO supports other expressions
+case ApplyFunctionExpression(function, children) =>
+  val childrenExpressions = children.flatMap(generateExpression(_))
+  if (childrenExpressions.length == children.length) {
+Some(new 
GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT),

Review Comment:
   UDF is kind of special and I think it's better to have a new v2 API for it, 
instead of reusing `GeneralScalarExpression`.
   
   e.g., we can add a `UserDefinedScalarFunction`, which defines the function 
name, canonical name and inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36593:
URL: https://github.com/apache/spark/pull/36593#discussion_r877113355


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala:
##
@@ -201,6 +203,14 @@ class V2ExpressionBuilder(
 None
   }
 // TODO supports other expressions
+case ApplyFunctionExpression(function, children) =>
+  val childrenExpressions = children.flatMap(generateExpression(_))
+  if (childrenExpressions.length == children.length) {
+Some(new 
GeneralScalarExpression(function.name().toUpperCase(Locale.ROOT),

Review Comment:
   UDF is kind of special and I think it's better to have a new v2 API for it, 
instead of reusing `GeneralScalarExpression`.
   
   e.g., we can add a `UserDefinedScalaExpression`, which defines the function 
name, canonical name and inputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] cloud-fan commented on a diff in pull request #36593: [SPARK-39139][SQL] DS V2 push-down framework supports DS V2 UDF

2022-05-19 Thread GitBox


cloud-fan commented on code in PR #36593:
URL: https://github.com/apache/spark/pull/36593#discussion_r877115137


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -744,6 +744,14 @@ object DataSourceStrategy
 PushableColumnWithoutNestedColumn(right), _) =>
   Some(new GeneralAggregateFunc("CORR", agg.isDistinct,
 Array(FieldReference.column(left), FieldReference.column(right
+case aggregate.V2Aggregator(aggrFunc, children, _, _) =>
+  val translatedExprs = children.flatMap(PushableExpression.unapply(_))
+  if (translatedExprs.length == children.length) {
+Some(new 
GeneralAggregateFunc(aggrFunc.name().toUpperCase(Locale.ROOT), agg.isDistinct,

Review Comment:
   ditto, let' create a dedicated v2 api, such as `UserDefinedAggregateFunction`



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##
@@ -744,6 +744,14 @@ object DataSourceStrategy
 PushableColumnWithoutNestedColumn(right), _) =>
   Some(new GeneralAggregateFunc("CORR", agg.isDistinct,
 Array(FieldReference.column(left), FieldReference.column(right
+case aggregate.V2Aggregator(aggrFunc, children, _, _) =>
+  val translatedExprs = children.flatMap(PushableExpression.unapply(_))
+  if (translatedExprs.length == children.length) {
+Some(new 
GeneralAggregateFunc(aggrFunc.name().toUpperCase(Locale.ROOT), agg.isDistinct,

Review Comment:
   ditto, let's create a dedicated v2 api, such as 
`UserDefinedAggregateFunction`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >