[PR] [WIP][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


MaxGekk opened a new pull request, #45057:
URL: https://github.com/apache/spark/pull/45057

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #43627:
URL: https://github.com/apache/spark/pull/43627#issuecomment-1931550906

   @mridulm . Of course, it's legit if it's not easy or there is no other way. 
Also, we have a similar breaking proposal, #45052 , too. While reviewing that 
PR, I double-checked this PR briefly.
   
   I'm totally fine if this is inevitable here and there. :) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1481292685


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -69,23 +79,33 @@ private[connect] object ProtoUtils {
   .concat(createTruncatedByteString(size)))
 }
 
-  // TODO(SPARK-43117): should also support 1, repeated msg; 2, map
+  // TODO(SPARK-46988): should support map
   case (field: FieldDescriptor, msg: Message)
-  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != 
null =>
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
!field.isRepeated
+&& msg != null =>
 builder.setField(field, abbreviate(msg, thresholds))
 
+  case (field: FieldDescriptor, msgs: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
field.isRepeated
+&& msgs != null =>

Review Comment:
   Do we need to consider the scenario where `strings.iterator().hasNext` is 
`false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1481292685


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -69,23 +79,33 @@ private[connect] object ProtoUtils {
   .concat(createTruncatedByteString(size)))
 }
 
-  // TODO(SPARK-43117): should also support 1, repeated msg; 2, map
+  // TODO(SPARK-46988): should support map
   case (field: FieldDescriptor, msg: Message)
-  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != 
null =>
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
!field.isRepeated
+&& msg != null =>
 builder.setField(field, abbreviate(msg, thresholds))
 
+  case (field: FieldDescriptor, msgs: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
field.isRepeated
+&& msgs != null =>

Review Comment:
   nit: Do we need to consider the scenario where `strings.iterator().hasNext` 
is `false`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1481292685


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -69,23 +79,33 @@ private[connect] object ProtoUtils {
   .concat(createTruncatedByteString(size)))
 }
 
-  // TODO(SPARK-43117): should also support 1, repeated msg; 2, map
+  // TODO(SPARK-46988): should support map
   case (field: FieldDescriptor, msg: Message)
-  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != 
null =>
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
!field.isRepeated
+&& msg != null =>
 builder.setField(field, abbreviate(msg, thresholds))
 
+  case (field: FieldDescriptor, msgs: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && 
field.isRepeated
+&& msgs != null =>

Review Comment:
   ~nit: Do we need to consider the scenario where `strings.iterator().hasNext` 
is `false`?~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1481327727


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -42,7 +42,17 @@ private[connect] object ProtoUtils {
 val size = string.length
 val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
 if (size > threshold) {
-  builder.setField(field, createString(string.take(threshold), size))
+  builder.setField(field, truncateString(string, threshold))
+}
+
+  case (field: FieldDescriptor, strings: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.STRING && 
field.isRepeated
+&& strings != null =>
+val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
+strings.iterator().asScala.zipWithIndex.foreach {
+  case (string: String, i) if string != null && string.length > 
threshold =>

Review Comment:
   Can `string` really be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1481365136


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   Since this doesn't work already, shall we put it in `removedSQLConfigs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on PR #45054:
URL: https://github.com/apache/spark/pull/45054#issuecomment-1931906138

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-07 Thread via GitHub


cloud-fan closed pull request #45054: [SPARK-46996][SQL] Allow AQE coalesce 
final stage in SQL cached plan
URL: https://github.com/apache/spark/pull/45054


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on PR #45054:
URL: https://github.com/apache/spark/pull/45054#issuecomment-1931908740

   @liuzqt can you correct the JIRA ticket ID? It seems wrong.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46989][SQL][CONNECT] Improve concurrency performance for SparkSession [spark]

2024-02-07 Thread via GitHub


hvanhovell commented on code in PR #45046:
URL: https://github.com/apache/spark/pull/45046#discussion_r1481443979


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -854,7 +855,7 @@ object SparkSession extends Logging {
 // the remote() function, as it takes precedence over the SPARK_REMOTE 
environment variable.
 private val builder = SparkConnectClient.builder().loadFromEnvironment()
 private var client: SparkConnectClient = _
-private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
+private[this] val options = new ConcurrentHashMap[String, String]

Review Comment:
   Building a spark session is typically single threaded... Is some actually 
having a problem here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46999][SQL] ExpressionWithUnresolvedIdentifier should include other expressions in the expression tree [spark]

2024-02-07 Thread via GitHub


cloud-fan opened a new pull request, #45058:
URL: https://github.com/apache/spark/pull/45058

   
   
   ### What changes were proposed in this pull request?
   
   The plan/expression tree will be expanded during analysis, due to our 
implementation of the IDENTIFIER clause: we keep a lambda function in the 
plan/expression node to lazily return a new plan/expression.
   
   This is usually fine, but doesn't work well with query parameter binding, 
which needs to see the entire plan/expression tree and bind all parameters at 
once. The feature EXECUTE IMMEDIATE also needs to see the entire plan tree to 
determine if it's a `PosParameterizedQuery` or `NameParameterizedQuery`.
   
   This PR fixes the problem for `ExpressionWithUnresolvedIdentifier`, to make 
the lambda function only return the expression node that needs the identifier, 
and other expressions should be kept in `ExpressionWithUnresolvedIdentifier` 
and passed to the lambda function later.
   
   ### Why are the changes needed?
   
   To make IDENTIFIER clause work with query parameter binding
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, certain queries can run now while they failed before. See the test
   
   ### How was this patch tested?
   
   new test
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46999][SQL] ExpressionWithUnresolvedIdentifier should include other expressions in the expression tree [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on PR #45058:
URL: https://github.com/apache/spark/pull/45058#issuecomment-1932049971

   cc @srielau @MaxGekk 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46922][CORE][SQL] Do not wrap runtime user-facing errors [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on PR #44953:
URL: https://github.com/apache/spark/pull/44953#issuecomment-1932057249

   thanks for the review, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46922][CORE][SQL] Do not wrap runtime user-facing errors [spark]

2024-02-07 Thread via GitHub


cloud-fan closed pull request #44953: [SPARK-46922][CORE][SQL] Do not wrap 
runtime user-facing errors
URL: https://github.com/apache/spark/pull/44953


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]

2024-02-07 Thread via GitHub


tigrulya-exe commented on code in PR #43463:
URL: https://github.com/apache/spark/pull/43463#discussion_r1481571956


##
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala:
##
@@ -1363,4 +1363,19 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
   }
 }
   }
+
+  test("SPARK-39910: read files from Hadoop archives") {
+val fileSchema = new StructType().add("str", StringType)
+val harPath = testFile("test-data/test-archive.har")
+  .replaceFirst("file:/", "har:/")
+
+testRead(spark.read.textFile(s"$harPath/test.txt").toDF(), data, 
textSchema)

Review Comment:
   Ok, removed file formats other than csv



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]

2024-02-07 Thread via GitHub


tigrulya-exe commented on code in PR #43463:
URL: https://github.com/apache/spark/pull/43463#discussion_r1481586528


##
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala:
##
@@ -1363,4 +1363,19 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
   }
 }
   }
+
+  test("SPARK-39910: read files from Hadoop archives") {
+val fileSchema = new StructType().add("str", StringType)
+val harPath = testFile("test-data/test-archive.har")
+  .replaceFirst("file:/", "har:/")

Review Comment:
   Yes, the `HarFileSystem` support is included in the HDFS client by default. 
Ok, removed tests from `DataSourceSuite`, left only `MockFileSystem#getUri` 
method to correctly qualify paths with `mockFs://` scheme.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45055:
URL: https://github.com/apache/spark/pull/45055#issuecomment-1932273211

   Hi, @cloud-fan . Could you review this? I'm wondering if you have any 
concern about this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


MaxGekk commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1481646652


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   I think we should not do that because put the config to `removedSQLConfigs` 
means we shall remove it from `SQLConf` which leads to breaking existing user's 
code in two cases:
   - User's code set the config to the default value `false` explicitly. In 
that case, user's app won't compile.
   - User's code set it to `true` but don't refer to the zero index at all for 
some reasons - so user's might not face to 
`java.util.IllegalFormatArgumentIndexException`. But the removing the config, 
we break the app.
   
   I believe we should deprecate the config first of all, and only after that 
we can remove it safely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46617][SQL] Create-table-if-not-exists should not silently overwrite existing data-files [spark]

2024-02-07 Thread via GitHub


adrians commented on code in PR #44622:
URL: https://github.com/apache/spark/pull/44622#discussion_r1474552639


##
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:
##
@@ -102,14 +102,17 @@ object DataWritingCommand {
   }
   /**
* When execute CTAS operators, and the location is not empty, throw 
[[AnalysisException]].
-   * For CTAS, the SaveMode is always [[ErrorIfExists]]
+   * For CTAS, the SaveMode is always [[ErrorIfExists]].
+   * For Create-Table-If-Not-Exists, the SaveMode is [[Ignore]].
*
* @param tablePath Table location.
* @param saveMode  Save mode of the table.
* @param hadoopConf Configuration.
*/
   def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, hadoopConf: 
Configuration): Unit = {

Review Comment:
   I don't think a separate flag is needed. The SaveModes are mapped reasonably 
over the SQL syntax variants, so no additional logic would be needed, we can 
base on the SaveMode.
   
   ```
   
+--+---++
   | SQL Syntax   | SaveMode  | Semantics   
   |
   
+--+---++
   | CREATE TABLE ... AS SELECT   | ErrorIfExists | If data/table 
exists, show error.  |
   |  |   | (or if 
allowNonEmptyLocationInCTAS is set, |
   |  |   |  ignore error & 
overwrite anyway)  |
   |  |   | 
   |
   | CREATE TABLE IF NOT EXISTS ... AS SELECT | Ignore| If data/table 
exists, ignore error & stop. |
   |  |   | 
   |
   | CREATE OR REPLACE TABLE ... AS SELECT| Overwrite | If data/table 
exists, ignore error & overwrite anyway. |
   |  |   | (or shows "The 
feature is not supported" exception)|
   
+--+---++
   ```
   
   My issue was that the `Ignore` was following the same code-path as 
`Overwrite` in this situation - the contents of the storage location were not 
checked (as implemented in assertEmptyRootPath) and as a consequence, the 
caller considered safe to overwrite it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


peter-toth commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1481726694


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Yes, probably make sense if we just fix this particular issue as 
`OpenHashSet` is used at many other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


peter-toth commented on PR #45036:
URL: https://github.com/apache/spark/pull/45036#issuecomment-1932381997

   > I can hold cutting the tag of RC1 for this.
   
   Thanks @HeartSaVioR. BTW I don't think this should be a blocker of 3.5.1 as 
this is not a regression, but if we can find a quick fix for this issue it 
would be good to include it in the release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


srielau opened a new pull request, #45059:
URL: https://github.com/apache/spark/pull/45059

   
   
   ### What changes were proposed in this pull request?
   
   Remove the unconditional Alias node generation when resolving a variable 
reference.
   
   ### Why are the changes needed?
   
   An Alias that is not at the top level of an expression blocks constant 
folding.
   Constant folding in turn is a requirement for variables to be usable as an 
argument to numerous functions, such as from_json().
   It also has performance implications.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing regression tests in sql-session-variables.sql, added a test to 
validate the fix.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun opened a new pull request, #45060:
URL: https://github.com/apache/spark/pull/45060

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1481939580


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):

Review Comment:
   Shall we have a test for `applyInArrow` as well?



##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45007:
URL: https://github.com/apache/spark/pull/45007#discussion_r1481953697


##
python/pyspark/sql/worker/analyze_udtf.py:
##
@@ -31,7 +31,7 @@
 write_with_length,
 SpecialLengths,
 )
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import OrderingColumn, PartitioningColumn, 
SelectedColumn

Review Comment:
   ```suggestion
   from pyspark.sql.functions import PartitioningColumn, SelectedColumn
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45007:
URL: https://github.com/apache/spark/pull/45007#discussion_r1481956296


##
python/pyspark/sql/worker/analyze_udtf.py:
##
@@ -31,7 +31,7 @@
 write_with_length,
 SpecialLengths,
 )
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import OrderingColumn, PartitioningColumn, 
SelectedColumn

Review Comment:
   Or we may want a similar check for `OrderingColumn`? which can be done in a 
separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-07 Thread via GitHub


ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1932703827

   Addressed feedback, cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481969778


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental}
  */
 @Experimental
 @Evolving
-private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
+private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {

Review Comment:
   Do we need to change this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481970354


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* any cleanup or teardown operations.
*/
   def close (): Unit
+
+  /**
+   * Function to set the stateful processor handle that will be used to 
interact with the state
+   * store and other stateful processor related operations.
+   * @param handle - instance of StatefulProcessorHandle
+   */
+  final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit 
= {

Review Comment:
   Maybe just say `setHandle` and `getHandle` for simplicity ? 
`StatefulProcessor` is kind of implied here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481970354


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* any cleanup or teardown operations.
*/
   def close (): Unit
+
+  /**
+   * Function to set the stateful processor handle that will be used to 
interact with the state
+   * store and other stateful processor related operations.
+   * @param handle - instance of StatefulProcessorHandle
+   */
+  final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit 
= {

Review Comment:
   Maybe just say `setHandle` and `getHandle` for simplicity ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481972279


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* any cleanup or teardown operations.
*/
   def close (): Unit

Review Comment:
   Can you also just add default impl for this as empty ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481971683


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* any cleanup or teardown operations.
*/
   def close (): Unit
+
+  /**
+   * Function to set the stateful processor handle that will be used to 
interact with the state
+   * store and other stateful processor related operations.
+   * @param handle - instance of StatefulProcessorHandle
+   */
+  final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit 
= {
+statefulProcessorHandle = handle
+  }
+
+  /**
+   * Function to get the stateful processor handle that will be used to 
interact with the state
+   * @return handle - instance of StatefulProcessorHandle
+   */
+  final def getStatefulProcessorHandle: StatefulProcessorHandle = {
+statefulProcessorHandle

Review Comment:
   Lets set the initial value to null. And if its null, then lets raise an 
exception/error to say that handle is not available ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


ericm-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481973732


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental}
  */
 @Experimental
 @Evolving
-private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
+private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {

Review Comment:
   We want to provide implementation for the getters and setters, so it can no 
longer be a trait, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481976306


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental}
  */
 @Experimental
 @Evolving
-private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
+private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {

Review Comment:
   I believe trait can have default/concrete implementation too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


dtenedor commented on code in PR #45007:
URL: https://github.com/apache/spark/pull/45007#discussion_r1481977258


##
python/pyspark/sql/worker/analyze_udtf.py:
##
@@ -31,7 +31,7 @@
 write_with_length,
 SpecialLengths,
 )
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import OrderingColumn, PartitioningColumn, 
SelectedColumn

Review Comment:
   Fixed this for now by removing `OrderingColumn`. I will add a similar check 
for that in a separate PR to help separate concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481977985


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils {
 class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
   with Logging {
   @transient private var _countState: ValueState[Long] = _
-  @transient var _processorHandle: StatefulProcessorHandle = _
-
-  override def init(
-  handle: StatefulProcessorHandle,
-  outputMode: OutputMode) : Unit = {
-_processorHandle = handle
-assert(handle.getQueryInfo().getBatchId >= 0)
-_countState = _processorHandle.getValueState[Long]("countState")
+
+  override def init(outputMode: OutputMode): Unit = {

Review Comment:
   Can we also add a test for verifying that the context is not available if 
its not initialized ? Maybe within `ValueStateSuite` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1481997397


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   +1 for the deprecation proposal. Maybe, could you add a proper warning to 
the SQL migration guide, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1932751890

   `core` passed. 
   https://github.com/apache/spark/assets/9700541/8c84c5cf-1ecf-4397-9aa8-9f748424607d";>
   
   Could you review this when you have some time, @sunchao ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


MaxGekk commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1482042611


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   I have updated the SQL migration guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482085088


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   > This is a bit tricky and it's better if we can find a reference system 
that defines this semantic.
   
   ```scala
   scala> import java.util.HashSet
   import java.util.HashSet
   
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(0.0)
   val res0: Boolean = true
   
   scala> h.add(-0.0)
   val res1: Boolean = true
   
   scala> h.size()
   val res2: Int = 2
   ```
   
   The doc for [HashSet.add][1] states:
   
   > More formally, adds the specified element e to this set if this set 
contains no element e2 such that Objects.equals(e, e2). If this set already 
contains the element, the call leaves the set unchanged and returns false.
   
   In other words, `java.util.HashSet` uses `equals` and not `==`, and 
therefore it considers `0.0` and `-0.0` distinct elements.
   
   So this PR brings `OpenHashSet` more in line with the semantics of 
`java.util.HashSet`.
   
   [1]: 
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/HashSet.html#add(E)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482089432


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   >  In Spark, 0.0 == -0.0, and in GROUP BY, 0.0 and -0.0 are considered to be 
in the same group and normalized to 0.0.
   
   This PR does not change this behavior. I noticed, however, that we do not 
have any tests currently to check that -0.0 is normalized and grouped as you 
describe, so I went ahead and added such a test in 
2bfc60548db2c41f4c64b63d40a2652cb22732ab.
   
   Does this address your concern? Or are you suggesting that we should 
normalize -0.0 to 0.0 across the board?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS] Show sort order of NaN relative to infinity [spark]

2024-02-07 Thread via GitHub


nchammas commented on PR #45047:
URL: https://github.com/apache/spark/pull/45047#issuecomment-1932909805

   I don't think we have an actual unit test to check this sort behavior, so I 
added one as part of #45036 (which also verifies some other behavior we are 
discussing there).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


ericm-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1482110798


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils {
 class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
   with Logging {
   @transient private var _countState: ValueState[Long] = _
-  @transient var _processorHandle: StatefulProcessorHandle = _
-
-  override def init(
-  handle: StatefulProcessorHandle,
-  outputMode: OutputMode) : Unit = {
-_processorHandle = handle
-assert(handle.getQueryInfo().getBatchId >= 0)
-_countState = _processorHandle.getValueState[Long]("countState")
+
+  override def init(outputMode: OutputMode): Unit = {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun opened a new pull request, #45061:
URL: https://github.com/apache/spark/pull/45061

   …
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Consider another interesting case where `java.util.HashSet` and 
`OpenHashSet` differ:
   
   ```scala
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(Double.NaN)
   val res9: Boolean = true
   
   scala> h.add(Double.NaN)
   val res10: Boolean = false
   
   scala> h.size()
   val res11: Int = 1
   ```
   
   On `master`, `OpenHashSet` does IMO the wrong thing:
   
   ```scala
   val set = new OpenHashSet[Double]()
   set.add(Double.NaN)
   set.add(Double.NaN)
   set.size  // returns 2
   ```
   
   This could possibly lead to a bug like the one reported in SPARK-45599 but 
in reverse, where a new NaN row is added rather than dropped. I will see if I 
can construct such a scenario as a demonstration. But regardless, I think this 
behavior is incorrect by itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482125925


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Note also that the docstring for `OpenHashSet` seems to imply that it is 
meant to be a faster but semantically equivalent alternative to 
`java.util.HashSet`:
   
   
https://github.com/apache/spark/blob/0d5c2ce427e06adfebf47bc446ff6646513ae750/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala#L31-L32
   
   If that's true, then we should perhaps add property based tests to ensure 
alignment between the two implementations, but I'll leave that as a potential 
future improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1932978043

   Thank you , @sunchao !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45007:
URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985032

   The failed tests seem not related to the last commit. The previous build 
passed except for linter that is fixed in the last commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45007:
URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985366

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin closed pull request #45007: [SPARK-46966][Python] Add UDTF API for 
'analyze' method to indicate subset of input table columns to select
URL: https://github.com/apache/spark/pull/45007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45035:
URL: https://github.com/apache/spark/pull/45035#issuecomment-1932995660

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]

2024-02-07 Thread via GitHub


ueshin closed pull request #45035: [SPARK-46688][SPARK-46691][PYTHON][CONNECT] 
Support v2 profiling in aggregate Pandas UDFs
URL: https://github.com/apache/spark/pull/45035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Consider another interesting case where `java.util.HashSet` and 
`OpenHashSet` differ:
   
   ```scala
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(Double.NaN)
   val res9: Boolean = true
   
   scala> h.add(Double.NaN)
   val res10: Boolean = false
   
   scala> h.contains(Double.NaN)
   val res11: Boolean = true
   
   scala> h.size()
   val res12: Int = 1
   ```
   
   On `master`, `OpenHashSet` does something obviously wrong:
   
   ```scala
   val set = new OpenHashSet[Double]()
   set.add(Double.NaN)
   set.add(Double.NaN)
   set.size  // returns 2
   set.contains(Double.NaN)  // returns false
   ```
   
   This could possibly lead to a bug like the one reported in SPARK-45599 but 
in reverse, where a new NaN row is added rather than dropped. I will see if I 
can construct such a scenario as a demonstration. But regardless, I think this 
behavior is incorrect by itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482146994


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):

Review Comment:
   Good catch!



##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]

2024-02-07 Thread via GitHub


dtenedor opened a new pull request, #45062:
URL: https://github.com/apache/spark/pull/45062

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47004] Added more tests to ClientStreamingQuerySuite to increase Scala client test coverage [spark]

2024-02-07 Thread via GitHub


bogao007 opened a new pull request, #45063:
URL: https://github.com/apache/spark/pull/45063

   
   
   ### What changes were proposed in this pull request?
   
   Added more tests to ClientStreamingQuerySuite to increase Scala client test 
coverage
   
   ### Why are the changes needed?
   
   To increase test coverage for Streaming Spark Connect Scala client
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   This is a test-only change.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]

2024-02-07 Thread via GitHub


dbatomic opened a new pull request, #45064:
URL: https://github.com/apache/spark/pull/45064

   
   
   ### What changes were proposed in this pull request?
   
   This PR adds E2E support for `collate` and `collation` expressions.
   Following changes were made to get us there:
   1) Set the right ordering for `PhysicalStringType` based on `collationId`.
   2) UTF8String is now just a data holder class - it no longer implements 
`Comparable` interface. All comparisons must be done through `CollationFactory`.
   3) `collate` and `collation` expressions are added. Special syntax for 
`collate` is enabled - `'hello world' COLLATE 'target_collation'
   4) First set of tests is added that covers both core expression and E2E 
collation tests.
   
   ### Why are the changes needed?
   
   This PR is part of larger collation track. For more details please refer to 
design doc attached in parent JIRA ticket.
   
   ### Does this PR introduce _any_ user-facing change?
   
   This test adds two new expressions and opens up new syntax.
   
   ### How was this patch tested?
   
   Basic tests are added. In follow up PRs we will add support for more 
advanced operators and keep adding tests alongside new feature support.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45060: [SPARK-47000][CORE] Use 
`getTotalMemorySize` in `WorkerArguments`
URL: https://github.com/apache/spark/pull/45060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1933047669

   All tests passed. Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933048949

   All tests passed. Could you review this PR, @viirya ?
   
   https://github.com/apache/spark/assets/9700541/8b1bde03-7eed-46b9-a4cf-e63327c39fa5";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482181478


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Is 1KB the minimum size we can specify for volume?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482182370


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Should we mention in some place (any doc?) that the minimum volume size is 
1KB?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482182475


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   No~ In this case, this value is ignored. Instead, the created volume size is 
determined by underlying K8s configuration.
   
   For example, 8GiB in some K8s system. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482183784


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Of course, a user can make a mistake like 10240, but this PR aims to handle 
the most common cases which we are sure that this is a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482183654


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean if `verifySize` fails, 8GB (or other default value) will be 
used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482184736


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, no~ This PR aims to fail the job from the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482185167


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   What I meant is the the previous behavior is 8GiB (or other default value).



##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   What I meant was the the previous behavior is 8GiB (or other default value).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value like 1KB, 8GB (or other 
default value) will be used.
   
   But now the PR makes it failed immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value less than 1KB, 8GB (or 
other default value) will be used.
   
   But now the PR makes it failed immediately.



##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value like 1KB, 8GB (or other 
default value) will be used.
   
   But now the PR makes it failed immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482187670


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Yes, right~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482188354


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   The previous behavior was not what the user's intention at all. And, the job 
fails after many executor losses as described in the PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482189060


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Hmm, if the goal is to limit the value to be larger than the default value, 
why don't we check against it but a value like 1KB? What will happen if user 
specify 10KB, for example? It will be used or 8GB (default value) will be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [REFERENCE][DO-NOT-MERGE] Initial implementation of python streaminng data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 opened a new pull request, #45065:
URL: https://github.com/apache/spark/pull/45065

   
   
   ### What changes were proposed in this pull request?
   This is a reference PR that contain all changes for supporting streaming 
python data source API.
   
   
   ### Why are the changes needed?
   Implement python streaming data source
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit test and integration test.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482191800


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Spark don't know the default values. It's controlled by K8s system admin.
   > Hmm, if the goal is to limit the value to be larger than the default 
value, why don't we check against it but a value like 1KB? 
   
   Yes, if the user has a `unit` like KB, this PR will not block.
   > What will happen if user specify 10KB, for example? It will be used or 8GB 
(default value) will be used?
   
   And, as I mentioned in the above,
   https://github.com/apache/spark/pull/45061#discussion_r1482183784, if a user 
uses 10240, we will not complain because it's too big as a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933068990

   Thank you, @viirya !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45061: [SPARK-47003][K8S] Detect and fail on 
invalid volume sizes (< 1KiB) in K8s
URL: https://github.com/apache/spark/pull/45061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933073565

   Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482216213


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val partitionsFuncId = 886
+  val latestOffsetsFuncId = 887

Review Comment:
   They are sent to python process to invoke the corresponding function. I also 
wonder whether is there better alternative than hardcoding the number in both 
scala and python code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482217627


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val partitionsFuncId = 886
+  val latestOffsetsFuncId = 887
+}
+
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for streaming functions. Sets up Spark 
Connect session

Review Comment:
   sorry, the code was copied from else where and I forgot to change this line 
of comment, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482219079


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala:
##
@@ -45,13 +63,50 @@ class PythonScan(
 new PythonPartitionReaderFactory(
   ds.source, readerFunc, outputSchema, jobArtifactUUID)
   }
+}
 
-  override def toBatch: Batch = this
+case class PythonStreamingSourceOffset(json: String) extends Offset
 
-  override def description: String = "(Python)"
+case class PythonStreamingSourcePartition(partition: Array[Byte]) extends 
InputPartition
 
-  override def readSchema(): StructType = outputSchema
+class PythonMicroBatchStream(

Review Comment:
   Moved to a separate file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482218173


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {

Review Comment:
   Comments added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on PR #45023:
URL: https://github.com/apache/spark/pull/45023#issuecomment-1933104236

   As suggested by @HeartSaVioR , I bring up a reference PR that contains the 
full change and end2end example of python streaming data source 
https://github.com/apache/spark/pull/45065/files#diff-aac0c53cf10992b58e7862115c9f72b1cb5b086a39b4e5b11543ac99d234f761R122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482237064


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala:
##
@@ -249,4 +249,34 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers 
{
 map(null) = null
 assert(map.get(null) === Some(null))
   }
+
+  test("SPARK-45599: 0.0 and -0.0 should count distinctly") {
+// Exactly these elements provided in roughly this order trigger a 
condition where lookups of
+// 0.0 and -0.0 in the bitset happen to collide, causing their counts to 
be merged incorrectly
+// and inconsistently if `==` is used to check for key equality.
+val spark45599Repro = Seq(
+  Double.NaN,
+  2.0,
+  168.0,
+  Double.NaN,
+  Double.NaN,
+  -0.0,
+  153.0,
+  0.0
+)
+
+val map1 = new OpenHashMap[Double, Int]()
+spark45599Repro.foreach(map1.changeValue(_, 1, {_ + 1}))
+map1.iterator.foreach(println)
+assert(map1(0.0) == 1)
+assert(map1(-0.0) == 1)
+
+val map2 = new OpenHashMap[Double, Int]()
+// Simply changing the order in which the elements are added to the map 
should not change the
+// counts for 0.0 and -0.0.
+spark45599Repro.reverse.foreach(map2.changeValue(_, 1, {_ + 1}))
+map2.iterator.foreach(println)
+assert(map2(0.0) == 1)
+assert(map2(-0.0) == 1)

Review Comment:
   This is another expression of the same bug that this PR addresses. If you 
run this test on `master`, you will see that the counts for 0.0 and -0.0 depend 
on the order in which the elements from `spark45599Repro` are added to the map.



##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {
+// Therefore, 0.0 and -0.0 should get separate entries in the hash set.
+//
+// Exactly these elements provided in roughly this order will trigger the 
following scenario:
+// When probing the bitset in `getPos(-0.0)`, the loop will happen upon 
the entry for 0.0.
+// In the old logic pre-SPARK-45599, the loop will find that the bit is 
set and, because
+// -0.0 == 0.0, it will think that's the position of -0.0. But in reality 
this is the position
+// of 0.0. So -0.0 and 0.0 will be stored at different positions, but 
`getPos()` will return
+// the same position for them. This can cause users of OpenHashSet, like 
OpenHashMap, to
+// return the wrong value for a key based on whether or not this bitset 
lookup collision
+// happens.

Review Comment:
   It really is amazing that @revans2 found this bug, because it depends on the 
set being a specific size and the 0.0 and -0.0 being inserted and then looked 
up in just the right order so that they happen to collide in the bitset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]

2024-02-07 Thread via GitHub


dtenedor commented on PR #45062:
URL: https://github.com/apache/spark/pull/45062#issuecomment-1933137062

   cc @ueshin here is the follow-up PR to add more checks for `analyze` result 
ordering fields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482258013


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,97 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):

Review Comment:
   Great suggestion, I have copied corresponding method comments from scala API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42304][FOLLOWUP][SQL] Add test for `GET_TABLES_BY_TYPE_UNSUPPORTED_BY_HIVE_VERSION` [spark]

2024-02-07 Thread via GitHub


github-actions[bot] commented on PR #42884:
URL: https://github.com/apache/spark/pull/42884#issuecomment-1933156917

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


revans2 commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482265980


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {
+// Therefore, 0.0 and -0.0 should get separate entries in the hash set.
+//
+// Exactly these elements provided in roughly this order will trigger the 
following scenario:
+// When probing the bitset in `getPos(-0.0)`, the loop will happen upon 
the entry for 0.0.
+// In the old logic pre-SPARK-45599, the loop will find that the bit is 
set and, because
+// -0.0 == 0.0, it will think that's the position of -0.0. But in reality 
this is the position
+// of 0.0. So -0.0 and 0.0 will be stored at different positions, but 
`getPos()` will return
+// the same position for them. This can cause users of OpenHashSet, like 
OpenHashMap, to
+// return the wrong value for a key based on whether or not this bitset 
lookup collision
+// happens.

Review Comment:
   I found it because I was essentially running fuzz testing comparing 
https://github.com/NVIDIA/spark-rapids to the gold standard Apache Spark. Most 
failures that we find end up being issues with the RAPIDs Accelerator for 
Apache Spark, but every so often I find something that ends up being wrong with 
Spark, like in this case.  So yes it was just pure luck that we found it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291388


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e

Review Comment:
   to be consistent with the previous code, we should add one more shortcut:
   ```
   if (!e.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) 
return e
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291771


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  // as we should resolve `UnresolvedAttribute` to a named 
expression. The caller side

Review Comment:
   nit: update the comment to not mention UnresolvedAttribute here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291904


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  // as we should resolve `UnresolvedAttribute` to a named 
expression. The caller side
+  // can trim the top-level alias if it's safe to do so. Since we 
will call
+  // CleanupAliases later in Analyzer, trim non top-level 
unnecessary alias is safe.
+  case Alias(child, _) if !isTopLevel => child
+  case other => other
+}
+  }
+  result
+
+// Re-resolves `TempResolvedColumn` if it has tried to be resolved 
with Aggregate

Review Comment:
   nit: let's keep the previous comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482298562


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala> val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala> var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala> for (i <- 1 to 9 ) {
+   * |   if (total >= taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   Hi @srowen. Actually, this PR converts the taskAmount which is a 
double/float value to a Long by multiplying `1E16.toLong`, and then the 
following calculation is based on Long instead of double/float. you can see, 
all the APIs of 
[ExecutorResourcesAmount](https://github.com/apache/spark/pull/44690/files#diff-91f559d0e4aeff4052cd7c6575d8622d911ecad0852040f64ccc1d3abfd605faR41)
 are using Long. Even the assigned resource of a task is still keeping the 
Long, you can refer to 
[there](https://github.com/apache/spark/pull/44690/files#diff-381b69941735d860cddcbb3b565abfef591a890aadecbc235283f3a57de9ba61R62).
   
   Yeah, but you think we should use BigDecimal, I'm Okay for that, I can make 
a PR for master branch first, and then cherry-pick to 3.5 branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


srowen commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482301112


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala> val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala> var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala> for (i <- 1 to 9 ) {
+   * |   if (total >= taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   I know. Above I give an example where multiplying by long works. I'm 
referring to your example of BigDecimal above, which does not use (only) 
BigDecimal. Please just try it.



##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala> val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala> var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala> for (i <- 1 to 9 ) {
+   * |   if (total >= taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   I know. Above I give an example where multiplying by long doesn't work. I'm 
referring to your example of BigDecimal above, which does not use (only) 
BigDecimal. Please just try it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482309634


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala> val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala> var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala> for (i <- 1 to 9 ) {
+   * |   if (total >= taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   Hi @srowen. Sure, let me have a PR using BD for master branch. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312940


##
python/pyspark/tests/test_memory_profiler.py:
##
@@ -441,6 +441,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr..

Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312796


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   Shall we rename to `left` and `right` according to the lint error?



##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_

Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482315739


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   I was wondering the reason and saw "In some fonts, these characters are 
indistinguishable from the numerals one and zero. When tempted to use 'l', use 
'L' instead." That's good to learn :p 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructu

Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1933236068

   Hi @HyukjinKwon, thx for your review and comment. The CI on the newest 
commit got passed, could you help review it again? Thx very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


zhengruifeng commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1482317218


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -42,7 +42,17 @@ private[connect] object ProtoUtils {
 val size = string.length
 val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
 if (size > threshold) {
-  builder.setField(field, createString(string.take(threshold), size))
+  builder.setField(field, truncateString(string, threshold))
+}
+
+  case (field: FieldDescriptor, strings: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.STRING && 
field.isRepeated
+&& strings != null =>
+val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
+strings.iterator().asScala.zipWithIndex.foreach {
+  case (string: String, i) if string != null && string.length > 
threshold =>

Review Comment:
   I guess so, but not very sure.
   So I added `string != null` to protect from NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482324222


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Spark's `OpenHashSet` does not have to match `java.util.HashSet`. What 
matters is the SQL semantic. Can you highlight which functions/operators are 
using this `OpenHashSet` and what is the impact of this change to the SQL 
semantic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >