Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


zhengruifeng commented on PR #45056:
URL: https://github.com/apache/spark/pull/45056#issuecomment-1933492153

   thank you guys, merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


zhengruifeng closed pull request #45056: [SPARK-43117][CONNECT] Make 
`ProtoUtils.abbreviate` support repeated fields
URL: https://github.com/apache/spark/pull/45056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46615][CONNECT] Support s.c.immutable.ArraySeq in ArrowDeserializers [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on PR #44618:
URL: https://github.com/apache/spark/pull/44618#issuecomment-1933461858

   Merged into master for Spark 4.0. Thanks @panbingkun @srowen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46615][CONNECT] Support s.c.immutable.ArraySeq in ArrowDeserializers [spark]

2024-02-07 Thread via GitHub


LuciferYang closed pull request #44618: [SPARK-46615][CONNECT] Support 
s.c.immutable.ArraySeq in ArrowDeserializers
URL: https://github.com/apache/spark/pull/44618


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46971][SQL] When the `compression` is null, a `NullPointException` should not be thrown [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45015:
URL: https://github.com/apache/spark/pull/45015#discussion_r1482479360


##
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala:
##
@@ -118,7 +119,11 @@ private[sql] class AvroOptions(
* taken into account. If the former one is not set too, the `snappy` codec 
is used by default.
*/
   val compression: String = {
-parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec)
+val v = 
parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec)
+if (v == null) {

Review Comment:
   Would there be compatibility issues if null values are prohibited in 
`CaseInsensitiveMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45055:
URL: https://github.com/apache/spark/pull/45055#issuecomment-1933455410

   Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45055: [SPARK-46997][CORE] Enable 
`spark.worker.cleanup.enabled` by default
URL: https://github.com/apache/spark/pull/45055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45055:
URL: https://github.com/apache/spark/pull/45055#issuecomment-1933452966

   Thank you, @LuciferYang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on PR #45017:
URL: https://github.com/apache/spark/pull/45017#issuecomment-1933447367

   Merged into branch-3.5. Thanks @panbingkun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again [spark]

2024-02-07 Thread via GitHub


LuciferYang closed pull request #45017: [SPARK-46400][CORE][SQL][3.5] When 
there are corrupted files in the local maven repo, skip this cache and try again
URL: https://github.com/apache/spark/pull/45017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]

2024-02-07 Thread via GitHub


HyukjinKwon commented on PR #45067:
URL: https://github.com/apache/spark/pull/45067#issuecomment-1933434170

   Thanks guys!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on PR #45067:
URL: https://github.com/apache/spark/pull/45067#issuecomment-1933429173

   Merged into master for Spark 4.0. Thanks @HyukjinKwon @WeichenXu123 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]

2024-02-07 Thread via GitHub


LuciferYang closed pull request #45067: [MINOR][PYTHON][SQL][TESTS] Don't load 
Python Data Source when Python executable is not available even for testing
URL: https://github.com/apache/spark/pull/45067


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on PR #45067:
URL: https://github.com/apache/spark/pull/45067#issuecomment-1933428090

   
![image](https://github.com/apache/spark/assets/1475305/8083ab3f-5466-4636-9bf9-a2c3ed5ae9c1)
   
   All test passed, I will merging this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on PR #45066:
URL: https://github.com/apache/spark/pull/45066#issuecomment-1933420406

   Merged into master for Spark 4.0. Thanks @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]

2024-02-07 Thread via GitHub


LuciferYang closed pull request #45066: [SPARK-47005][PYTHON][DOCS] Refine 
docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last`
URL: https://github.com/apache/spark/pull/45066


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993][SQL] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on PR #45059:
URL: https://github.com/apache/spark/pull/45059#issuecomment-1933410098

   seems the code style check failed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993][SQL] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


srielau commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482418715


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  // as we should resolve `UnresolvedAttribute` to a named 
expression. The caller side

Review Comment:
   Why not? It is an UnresolvedAttribute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]

2024-02-07 Thread via GitHub


HyukjinKwon opened a new pull request, #45067:
URL: https://github.com/apache/spark/pull/45067

   ### What changes were proposed in this pull request?
   
   This PR proposes to don't load Python Data Source Python executable is not 
available even for testing
   
   ### Why are the changes needed?
   
   Whether if we're in test or not, it can't work loading Python Data Sources 
anyway.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, dev-only.
   
   ### How was this patch tested?
   
   Manually tested.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45055:
URL: https://github.com/apache/spark/pull/45055#issuecomment-1933307259

   Could you review this PR, @LuciferYang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


zhengruifeng commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1482368948


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -42,7 +42,17 @@ private[connect] object ProtoUtils {
 val size = string.length
 val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
 if (size > threshold) {
-  builder.setField(field, createString(string.take(threshold), size))
+  builder.setField(field, truncateString(string, threshold))
+}
+
+  case (field: FieldDescriptor, strings: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.STRING && 
field.isRepeated
+&& strings != null =>
+val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
+strings.iterator().asScala.zipWithIndex.foreach {
+  case (string: String, i) if string != null && string.length > 
threshold =>

Review Comment:
   thanks for checking this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]

2024-02-07 Thread via GitHub


LuciferYang opened a new pull request, #45066:
URL: https://github.com/apache/spark/pull/45066

   ### What changes were proposed in this pull request?
   This pr refine docstring of  
`asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` and add some 
new examples.
   
   ### Why are the changes needed?
   To improve PySpark documentation
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Pass Github Actions
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write [spark]

2024-02-07 Thread via GitHub


HeartSaVioR closed pull request #45049: [SPARK-46994][PYTHON] Refactor 
PythonWrite to prepare for supporting python data source streaming write
URL: https://github.com/apache/spark/pull/45049


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write [spark]

2024-02-07 Thread via GitHub


HeartSaVioR commented on PR #45049:
URL: https://github.com/apache/spark/pull/45049#issuecomment-1933299295

   Thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-07 Thread via GitHub


HeartSaVioR closed pull request #44884: [SPARK-46865][SS] Add Batch Support for 
TransformWithState Operator
URL: https://github.com/apache/spark/pull/44884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-07 Thread via GitHub


HeartSaVioR commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1933298214

   Thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


LuciferYang commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1482339139


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -42,7 +42,17 @@ private[connect] object ProtoUtils {
 val size = string.length
 val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
 if (size > threshold) {
-  builder.setField(field, createString(string.take(threshold), size))
+  builder.setField(field, truncateString(string, threshold))
+}
+
+  case (field: FieldDescriptor, strings: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.STRING && 
field.isRepeated
+&& strings != null =>
+val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
+strings.iterator().asScala.zipWithIndex.foreach {
+  case (string: String, i) if string != null && string.length > 
threshold =>

Review Comment:
   In fact, for repeated fields, null checks exist whether adding a single 
element or adding a collection of elements, for example:
   
   ```java
   public Builder addColumnNames(
   java.lang.String value) {
 if (value == null) { throw new NullPointerException(); }
 ensureColumnNamesIsMutable();
 columnNames_.add(value);
 bitField0_ |= 0x0004;
 onChanged();
 return this;
   }
   // 
com.google.protobuf.AbstractMessageLite.Builder#addAll(java.lang.Iterable, 
java.util.List)
   protected static  void addAll(final Iterable values, final List list) {
 checkNotNull(values);
 if (values instanceof LazyStringList) {
   // For StringOrByteStringLists, check the underlying elements to 
avoid
   // forcing conversions of ByteStrings to Strings.
   // TODO: Could we just prohibit nulls in all protobuf lists and get 
rid of this? Is
   // if even possible to hit this condition as all protobuf methods 
check for null first,
   // right?
   List lazyValues = ((LazyStringList) 
values).getUnderlyingElements();
   LazyStringList lazyList = (LazyStringList) list;
   
   ```
   
   also, I have performed the following checks:
   
   - adding a collection of elements with null

   ```
   val names = Seq.range(0, 10).map(i => if (i == 3) null else i.toString * 
1024)
   val drop = 
proto.Drop.newBuilder().setInput(sql).addAllColumnNames(names.asJava).build()
   ```
   
   ```
   [info] - truncate repeated strings with nulls *** FAILED *** (3 milliseconds)
   [info]   java.lang.NullPointerException: Element at index 3 is null.
   [info]   at 
com.google.protobuf.AbstractMessageLite$Builder.addAllCheckingNulls(AbstractMessageLite.java:359)
   [info]   at 
com.google.protobuf.AbstractMessageLite$Builder.addAll(AbstractMessageLite.java:414)
   [info]   at 
org.apache.spark.connect.proto.Drop$Builder.addAllColumnNames(Drop.java:1240)
   ```
   
   - add a null element 
   ```
   val drop = proto.Drop.newBuilder().setInput(sql).addColumnNames(null).build()
   ```
   
   ```
   [info] - truncate repeated strings with nulls *** FAILED *** (4 milliseconds)
   [info]   java.lang.NullPointerException:
   [info]   at 
org.apache.spark.connect.proto.Drop$Builder.addColumnNames(Drop.java:1221)
   ```
   
   As you can see, under normal circumstances, it is impossible to add a null 
element to the repeated type. So personally, I don't think this null check is 
necessary, but I don't object to adding it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45057:
URL: https://github.com/apache/spark/pull/45057#issuecomment-1933262616

   Merged to master for Apache Spark 4.0.0.
   Thank you, @MaxGekk and @cloud-fan .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45057: [SPARK-46998][SQL] Deprecate the SQL 
config `spark.sql.legacy.allowZeroIndexInFormatString`
URL: https://github.com/apache/spark/pull/45057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #43463:
URL: https://github.com/apache/spark/pull/43463#discussion_r1482326766


##
sql/core/src/test/resources/test-data/test-archive.har/_index:
##
@@ -0,0 +1,7 @@
+%2F dir 1697722622766+493+tigrulya+hadoop 0 0 test.txt test.orc test.parquet 
test.json test.csv 

Review Comment:
   can we clean up the test files a bit? We only test csv now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #43463:
URL: https://github.com/apache/spark/pull/43463#discussion_r1482325524


##
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala:
##
@@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem {
   override def globStatus(pathPattern: Path): Array[FileStatus] = {
 mockGlobResults.getOrElse(pathPattern, Array())
   }
+
+  override def getUri: URI = URI.create("mockFs://mockFs/")

Review Comment:
   is this change needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46617][SQL] Create-table-if-not-exists should not silently overwrite existing data-files [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #44622:
URL: https://github.com/apache/spark/pull/44622#discussion_r1482324935


##
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala:
##
@@ -102,14 +102,17 @@ object DataWritingCommand {
   }
   /**
* When execute CTAS operators, and the location is not empty, throw 
[[AnalysisException]].
-   * For CTAS, the SaveMode is always [[ErrorIfExists]]
+   * For CTAS, the SaveMode is always [[ErrorIfExists]].
+   * For Create-Table-If-Not-Exists, the SaveMode is [[Ignore]].
*
* @param tablePath Table location.
* @param saveMode  Save mode of the table.
* @param hadoopConf Configuration.
*/
   def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, hadoopConf: 
Configuration): Unit = {

Review Comment:
   how about the Scala/Python APIs? `DataFrameWriter.mode(...).saveAsTable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482324222


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Spark's `OpenHashSet` does not have to match `java.util.HashSet`. What 
matters is the SQL semantic. Can you highlight which functions/operators are 
using this `OpenHashSet` and what is the impact of this change to the SQL 
semantic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-07 Thread via GitHub


zhengruifeng commented on code in PR #45056:
URL: https://github.com/apache/spark/pull/45056#discussion_r1482317218


##
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##
@@ -42,7 +42,17 @@ private[connect] object ProtoUtils {
 val size = string.length
 val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
 if (size > threshold) {
-  builder.setField(field, createString(string.take(threshold), size))
+  builder.setField(field, truncateString(string, threshold))
+}
+
+  case (field: FieldDescriptor, strings: java.lang.Iterable[_])
+  if field.getJavaType == FieldDescriptor.JavaType.STRING && 
field.isRepeated
+&& strings != null =>
+val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE)
+strings.iterator().asScala.zipWithIndex.foreach {
+  case (string: String, i) if string != null && string.length > 
threshold =>

Review Comment:
   I guess so, but not very sure.
   So I added `string != null` to protect from NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on PR #44852:
URL: https://github.com/apache/spark/pull/44852#issuecomment-1933236068

   Hi @HyukjinKwon, thx for your review and comment. The CI on the newest 
commit got passed, could you help review it again? Thx very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482315739


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   I was wondering the reason and saw "In some fonts, these characters are 
indistinguishable from the numerals one and zero. When tempted to use 'l', use 
'L' instead." That's good to learn :p 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact 

Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312796


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   Shall we rename to `left` and `right` according to the lint error?



##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def 

Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


ueshin commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482312940


##
python/pyspark/tests/test_memory_profiler.py:
##
@@ -441,6 +441,129 @@ def min_udf(v: pd.Series) -> float:
 io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
 )
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_cogroup_apply_in_pandas(self):
+# FlatMapCoGroupsInBatchExec
+import pandas as pd
+
+df1 = self.spark.createDataFrame(
+[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), 
(2102, 2, 4.0)],
+("time", "id", "v1"),
+)
+df2 = self.spark.createDataFrame(
+[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2")
+)
+
+def asof_join(left, right):
+return pd.merge_asof(left, right, on="time", by="id")
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
+asof_join, schema="time int, id int, v1 double, v2 string"
+).show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_group_apply_in_arrow(self):
+# FlatMapGroupsInBatchExec
+import pyarrow.compute as pc
+
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(table):
+v = table.column("v")
+norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1))
+return table.set_column(1, "v", norm)
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}):
+df.groupby("id").applyInArrow(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showMemoryProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_memory_profiler_cogroup_apply_in_arrow(self):
+import pyarrow as pa
+
+df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 
4.0)], ("id", "v1"))
+df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2"))
+
+def summarize(l, r):

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: 

Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482309634


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   Hi @srowen. Sure, let me have a PR using BD for master branch. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


srowen commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482301112


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   I know. Above I give an example where multiplying by long works. I'm 
referring to your example of BigDecimal above, which does not use (only) 
BigDecimal. Please just try it.



##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   I know. Above I give an example where multiplying by long doesn't work. I'm 
referring to your example of BigDecimal above, which does not use (only) 
BigDecimal. Please just try it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-07 Thread via GitHub


wbo4958 commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1482298562


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   Hi @srowen. Actually, this PR converts the taskAmount which is a 
double/float value to a Long by multiplying `1E16.toLong`, and then the 
following calculation is based on Long instead of double/float. you can see, 
all the APIs of 
[ExecutorResourcesAmount](https://github.com/apache/spark/pull/44690/files#diff-91f559d0e4aeff4052cd7c6575d8622d911ecad0852040f64ccc1d3abfd605faR41)
 are using Long. Even the assigned resource of a task is still keeping the 
Long, you can refer to 
[there](https://github.com/apache/spark/pull/44690/files#diff-381b69941735d860cddcbb3b565abfef591a890aadecbc235283f3a57de9ba61R62).
   
   Yeah, but you think we should use BigDecimal, I'm Okay for that, I can make 
a PR for master branch first, and then cherry-pick to 3.5 branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291904


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  // as we should resolve `UnresolvedAttribute` to a named 
expression. The caller side
+  // can trim the top-level alias if it's safe to do so. Since we 
will call
+  // CleanupAliases later in Analyzer, trim non top-level 
unnecessary alias is safe.
+  case Alias(child, _) if !isTopLevel => child
+  case other => other
+}
+  }
+  result
+
+// Re-resolves `TempResolvedColumn` if it has tried to be resolved 
with Aggregate

Review Comment:
   nit: let's keep the previous comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291771


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e
+  val resolved = e match {
+case u @ UnresolvedAttribute(nameParts) =>
+  val result = withPosition(u) {
+resolve(nameParts).getOrElse(u) match {
+  // We trim unnecessary alias here. Note that, we cannot trim the 
alias at top-level,
+  // as we should resolve `UnresolvedAttribute` to a named 
expression. The caller side

Review Comment:
   nit: update the comment to not mention UnresolvedAttribute here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]

2024-02-07 Thread via GitHub


cloud-fan commented on code in PR #45059:
URL: https://github.com/apache/spark/pull/45059#discussion_r1482291388


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##
@@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
   }.map(e => Alias(e, nameParts.last)())
 }
 
-e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, 
TEMP_RESOLVED_COLUMN)) {
-  case u: UnresolvedAttribute =>
-resolve(u.nameParts).getOrElse(u)
-  // Re-resolves `TempResolvedColumn` as variable references if it has 
tried to be resolved with
-  // Aggregate but failed.
-  case t: TempResolvedColumn if t.hasTried =>
-resolve(t.nameParts).getOrElse(t)
+def innerResolve(e: Expression, isTopLevel: Boolean): Expression = 
withOrigin(e.origin) {
+  if (e.resolved) return e

Review Comment:
   to be consistent with the previous code, we should add one more shortcut:
   ```
   if (!e.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) 
return e
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


revans2 commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482265980


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {
+// Therefore, 0.0 and -0.0 should get separate entries in the hash set.
+//
+// Exactly these elements provided in roughly this order will trigger the 
following scenario:
+// When probing the bitset in `getPos(-0.0)`, the loop will happen upon 
the entry for 0.0.
+// In the old logic pre-SPARK-45599, the loop will find that the bit is 
set and, because
+// -0.0 == 0.0, it will think that's the position of -0.0. But in reality 
this is the position
+// of 0.0. So -0.0 and 0.0 will be stored at different positions, but 
`getPos()` will return
+// the same position for them. This can cause users of OpenHashSet, like 
OpenHashMap, to
+// return the wrong value for a key based on whether or not this bitset 
lookup collision
+// happens.

Review Comment:
   I found it because I was essentially running fuzz testing comparing 
https://github.com/NVIDIA/spark-rapids to the gold standard Apache Spark. Most 
failures that we find end up being issues with the RAPIDs Accelerator for 
Apache Spark, but every so often I find something that ends up being wrong with 
Spark, like in this case.  So yes it was just pure luck that we found it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42304][FOLLOWUP][SQL] Add test for `GET_TABLES_BY_TYPE_UNSUPPORTED_BY_HIVE_VERSION` [spark]

2024-02-07 Thread via GitHub


github-actions[bot] commented on PR #42884:
URL: https://github.com/apache/spark/pull/42884#issuecomment-1933156917

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482258013


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,97 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):

Review Comment:
   Great suggestion, I have copied corresponding method comments from scala API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]

2024-02-07 Thread via GitHub


dtenedor commented on PR #45062:
URL: https://github.com/apache/spark/pull/45062#issuecomment-1933137062

   cc @ueshin here is the follow-up PR to add more checks for `analyze` result 
ordering fields.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482237064


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala:
##
@@ -249,4 +249,34 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers 
{
 map(null) = null
 assert(map.get(null) === Some(null))
   }
+
+  test("SPARK-45599: 0.0 and -0.0 should count distinctly") {
+// Exactly these elements provided in roughly this order trigger a 
condition where lookups of
+// 0.0 and -0.0 in the bitset happen to collide, causing their counts to 
be merged incorrectly
+// and inconsistently if `==` is used to check for key equality.
+val spark45599Repro = Seq(
+  Double.NaN,
+  2.0,
+  168.0,
+  Double.NaN,
+  Double.NaN,
+  -0.0,
+  153.0,
+  0.0
+)
+
+val map1 = new OpenHashMap[Double, Int]()
+spark45599Repro.foreach(map1.changeValue(_, 1, {_ + 1}))
+map1.iterator.foreach(println)
+assert(map1(0.0) == 1)
+assert(map1(-0.0) == 1)
+
+val map2 = new OpenHashMap[Double, Int]()
+// Simply changing the order in which the elements are added to the map 
should not change the
+// counts for 0.0 and -0.0.
+spark45599Repro.reverse.foreach(map2.changeValue(_, 1, {_ + 1}))
+map2.iterator.foreach(println)
+assert(map2(0.0) == 1)
+assert(map2(-0.0) == 1)

Review Comment:
   This is another expression of the same bug that this PR addresses. If you 
run this test on `master`, you will see that the counts for 0.0 and -0.0 depend 
on the order in which the elements from `spark45599Repro` are added to the map.



##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {
+// Therefore, 0.0 and -0.0 should get separate entries in the hash set.
+//
+// Exactly these elements provided in roughly this order will trigger the 
following scenario:
+// When probing the bitset in `getPos(-0.0)`, the loop will happen upon 
the entry for 0.0.
+// In the old logic pre-SPARK-45599, the loop will find that the bit is 
set and, because
+// -0.0 == 0.0, it will think that's the position of -0.0. But in reality 
this is the position
+// of 0.0. So -0.0 and 0.0 will be stored at different positions, but 
`getPos()` will return
+// the same position for them. This can cause users of OpenHashSet, like 
OpenHashMap, to
+// return the wrong value for a key based on whether or not this bitset 
lookup collision
+// happens.

Review Comment:
   It really is amazing that @revans2 found this bug, because it depends on the 
set being a specific size and the 0.0 and -0.0 being inserted and then looked 
up in just the right order so that they happen to collide in the bitset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on PR #45023:
URL: https://github.com/apache/spark/pull/45023#issuecomment-1933104236

   As suggested by @HeartSaVioR , I bring up a reference PR that contains the 
full change and end2end example of python streaming data source 
https://github.com/apache/spark/pull/45065/files#diff-aac0c53cf10992b58e7862115c9f72b1cb5b086a39b4e5b11543ac99d234f761R122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482218173


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {

Review Comment:
   Comments added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482219079


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala:
##
@@ -45,13 +63,50 @@ class PythonScan(
 new PythonPartitionReaderFactory(
   ds.source, readerFunc, outputSchema, jobArtifactUUID)
   }
+}
 
-  override def toBatch: Batch = this
+case class PythonStreamingSourceOffset(json: String) extends Offset
 
-  override def description: String = "(Python)"
+case class PythonStreamingSourcePartition(partition: Array[Byte]) extends 
InputPartition
 
-  override def readSchema(): StructType = outputSchema
+class PythonMicroBatchStream(

Review Comment:
   Moved to a separate file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482217627


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val partitionsFuncId = 886
+  val latestOffsetsFuncId = 887
+}
+
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for streaming functions. Sets up Spark 
Connect session

Review Comment:
   sorry, the code was copied from else where and I forgot to change this line 
of comment, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1482216213


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val partitionsFuncId = 886
+  val latestOffsetsFuncId = 887

Review Comment:
   They are sent to python process to invoke the corresponding function. I also 
wonder whether is there better alternative than hardcoding the number in both 
scala and python code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933073565

   Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45061: [SPARK-47003][K8S] Detect and fail on 
invalid volume sizes (< 1KiB) in K8s
URL: https://github.com/apache/spark/pull/45061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933068990

   Thank you, @viirya !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482191800


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Spark don't know the default values. It's controlled by K8s system admin.
   > Hmm, if the goal is to limit the value to be larger than the default 
value, why don't we check against it but a value like 1KB? 
   
   Yes, if the user has a `unit` like KB, this PR will not block.
   > What will happen if user specify 10KB, for example? It will be used or 8GB 
(default value) will be used?
   
   And, as I mentioned in the above,
   https://github.com/apache/spark/pull/45061#discussion_r1482183784, if a user 
uses 10240, we will not complain because it's too big as a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [REFERENCE][DO-NOT-MERGE] Initial implementation of python streaminng data source [spark]

2024-02-07 Thread via GitHub


chaoqin-li1123 opened a new pull request, #45065:
URL: https://github.com/apache/spark/pull/45065

   
   
   ### What changes were proposed in this pull request?
   This is a reference PR that contain all changes for supporting streaming 
python data source API.
   
   
   ### Why are the changes needed?
   Implement python streaming data source
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit test and integration test.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482189060


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Hmm, if the goal is to limit the value to be larger than the default value, 
why don't we check against it but a value like 1KB? What will happen if user 
specify 10KB, for example? It will be used or 8GB (default value) will be used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482188354


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   The previous behavior was not what the user's intention at all. And, the job 
fails after many executor losses as described in the PR description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482187670


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Yes, right~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value less than 1KB, 8GB (or 
other default value) will be used.
   
   But now the PR makes it failed immediately.



##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value like 1KB, 8GB (or other 
default value) will be used.
   
   But now the PR makes it failed immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean, previously once user specify a value like 1KB, 8GB (or other 
default value) will be used.
   
   But now the PR makes it failed immediately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482185167


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   What I meant is the the previous behavior is 8GiB (or other default value).



##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   What I meant was the the previous behavior is 8GiB (or other default value).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482184736


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, no~ This PR aims to fail the job from the beginning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482183654


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Oh, you mean if `verifySize` fails, 8GB (or other default value) will be 
used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482183784


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Of course, a user can make a mistake like 10240, but this PR aims to handle 
the most common cases which we are sure that this is a mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482182475


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   No~ In this case, this value is ignored. Instead, the created volume size is 
determined by underlying K8s configuration.
   
   For example, 8GiB in some K8s system. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482182370


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Should we mention in some place (any doc?) that the minimum volume size is 
1KB?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


viirya commented on code in PR #45061:
URL: https://github.com/apache/spark/pull/45061#discussion_r1482181478


##
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala:
##
@@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils {
   throw new NoSuchElementException(key + s" is required for $msg")
 }
   }
+
+  private def verifySize(size: Option[String]): Unit = {
+size.foreach { v =>
+  if (v.forall(_.isDigit) && parseLong(v) < 1024) {
+throw new IllegalArgumentException(
+  s"Volume size `$v` is smaller than 1KiB. Missing units?")
+  }
+}

Review Comment:
   Is 1KB the minimum size we can specify for volume?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45061:
URL: https://github.com/apache/spark/pull/45061#issuecomment-1933048949

   All tests passed. Could you review this PR, @viirya ?
   
   https://github.com/apache/spark/assets/9700541/8b1bde03-7eed-46b9-a4cf-e63327c39fa5;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1933047669

   All tests passed. Merged to master for Apache Spark 4.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun closed pull request #45060: [SPARK-47000][CORE] Use 
`getTotalMemorySize` in `WorkerArguments`
URL: https://github.com/apache/spark/pull/45060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]

2024-02-07 Thread via GitHub


dbatomic opened a new pull request, #45064:
URL: https://github.com/apache/spark/pull/45064

   
   
   ### What changes were proposed in this pull request?
   
   This PR adds E2E support for `collate` and `collation` expressions.
   Following changes were made to get us there:
   1) Set the right ordering for `PhysicalStringType` based on `collationId`.
   2) UTF8String is now just a data holder class - it no longer implements 
`Comparable` interface. All comparisons must be done through `CollationFactory`.
   3) `collate` and `collation` expressions are added. Special syntax for 
`collate` is enabled - `'hello world' COLLATE 'target_collation'
   4) First set of tests is added that covers both core expression and E2E 
collation tests.
   
   ### Why are the changes needed?
   
   This PR is part of larger collation track. For more details please refer to 
design doc attached in parent JIRA ticket.
   
   ### Does this PR introduce _any_ user-facing change?
   
   This test adds two new expressions and opens up new syntax.
   
   ### How was this patch tested?
   
   Basic tests are added. In follow up PRs we will add support for more 
advanced operators and keep adding tests alongside new feature support.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47004] Added more tests to ClientStreamingQuerySuite to increase Scala client test coverage [spark]

2024-02-07 Thread via GitHub


bogao007 opened a new pull request, #45063:
URL: https://github.com/apache/spark/pull/45063

   
   
   ### What changes were proposed in this pull request?
   
   Added more tests to ClientStreamingQuerySuite to increase Scala client test 
coverage
   
   ### Why are the changes needed?
   
   To increase test coverage for Streaming Spark Connect Scala client
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   This is a test-only change.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]

2024-02-07 Thread via GitHub


dtenedor opened a new pull request, #45062:
URL: https://github.com/apache/spark/pull/45062

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]

2024-02-07 Thread via GitHub


xinrong-meng commented on code in PR #45050:
URL: https://github.com/apache/spark/pull/45050#discussion_r1482146994


##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):

Review Comment:
   Good catch!



##
python/pyspark/sql/tests/test_udf_profiler.py:
##
@@ -333,6 +333,69 @@ def filter_func(iterator):
 
 self.assertEqual(0, len(self.profile_results), 
str(self.profile_results.keys()))
 
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_group_apply_in_pandas(self):
+# FlatMapGroupsInBatchExec
+df = self.spark.createDataFrame(
+[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")
+)
+
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(v=(v - v.mean()) / v.std())
+
+with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}):
+df.groupby("id").applyInPandas(normalize, schema="id long, v 
double").show()
+
+self.assertEqual(1, len(self.profile_results), 
str(self.profile_results.keys()))
+
+for id in self.profile_results:
+with self.trap_stdout() as io:
+self.spark.showPerfProfiles(id)
+
+self.assertIn(f"Profile of UDF", io.getvalue())
+self.assertRegex(
+io.getvalue(), 
f"2.*{os.path.basename(inspect.getfile(_do_computation))}"
+)
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+def test_perf_profiler_cogroup_apply_in_pandas(self):

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Consider another interesting case where `java.util.HashSet` and 
`OpenHashSet` differ:
   
   ```scala
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(Double.NaN)
   val res9: Boolean = true
   
   scala> h.add(Double.NaN)
   val res10: Boolean = false
   
   scala> h.contains(Double.NaN)
   val res11: Boolean = true
   
   scala> h.size()
   val res12: Int = 1
   ```
   
   On `master`, `OpenHashSet` does something obviously wrong:
   
   ```scala
   val set = new OpenHashSet[Double]()
   set.add(Double.NaN)
   set.add(Double.NaN)
   set.size  // returns 2
   set.contains(Double.NaN)  // returns false
   ```
   
   This could possibly lead to a bug like the one reported in SPARK-45599 but 
in reverse, where a new NaN row is added rather than dropped. I will see if I 
can construct such a scenario as a demonstration. But regardless, I think this 
behavior is incorrect by itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]

2024-02-07 Thread via GitHub


ueshin closed pull request #45035: [SPARK-46688][SPARK-46691][PYTHON][CONNECT] 
Support v2 profiling in aggregate Pandas UDFs
URL: https://github.com/apache/spark/pull/45035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45035:
URL: https://github.com/apache/spark/pull/45035#issuecomment-1932995660

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin closed pull request #45007: [SPARK-46966][Python] Add UDTF API for 
'analyze' method to indicate subset of input table columns to select
URL: https://github.com/apache/spark/pull/45007


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45007:
URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985366

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


ueshin commented on PR #45007:
URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985032

   The failed tests seem not related to the last commit. The previous build 
passed except for linter that is fixed in the last commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1932978043

   Thank you , @sunchao !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482125925


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Note also that the docstring for `OpenHashSet` seems to imply that it is 
meant to be a faster but semantically equivalent alternative to 
`java.util.HashSet`:
   
   
https://github.com/apache/spark/blob/0d5c2ce427e06adfebf47bc446ff6646513ae750/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala#L31-L32
   
   If that's true, then we should perhaps add property based tests to ensure 
alignment between the two implementations, but I'll leave that as a potential 
future improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   Consider another interesting case where `java.util.HashSet` and 
`OpenHashSet` differ:
   
   ```scala
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(Double.NaN)
   val res9: Boolean = true
   
   scala> h.add(Double.NaN)
   val res10: Boolean = false
   
   scala> h.size()
   val res11: Int = 1
   ```
   
   On `master`, `OpenHashSet` does IMO the wrong thing:
   
   ```scala
   val set = new OpenHashSet[Double]()
   set.add(Double.NaN)
   set.add(Double.NaN)
   set.size  // returns 2
   ```
   
   This could possibly lead to a bug like the one reported in SPARK-45599 but 
in reverse, where a new NaN row is added rather than dropped. I will see if I 
can construct such a scenario as a demonstration. But regardless, I think this 
behavior is incorrect by itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun opened a new pull request, #45061:
URL: https://github.com/apache/spark/pull/45061

   …
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


ericm-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1482110798


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils {
 class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
   with Logging {
   @transient private var _countState: ValueState[Long] = _
-  @transient var _processorHandle: StatefulProcessorHandle = _
-
-  override def init(
-  handle: StatefulProcessorHandle,
-  outputMode: OutputMode) : Unit = {
-_processorHandle = handle
-assert(handle.getQueryInfo().getBatchId >= 0)
-_countState = _processorHandle.getValueState[Long]("countState")
+
+  override def init(outputMode: OutputMode): Unit = {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][DOCS] Show sort order of NaN relative to infinity [spark]

2024-02-07 Thread via GitHub


nchammas commented on PR #45047:
URL: https://github.com/apache/spark/pull/45047#issuecomment-1932909805

   I don't think we have an actual unit test to check this sort behavior, so I 
added one as part of #45036 (which also verifies some other behavior we are 
discussing there).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482089432


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   >  In Spark, 0.0 == -0.0, and in GROUP BY, 0.0 and -0.0 are considered to be 
in the same group and normalized to 0.0.
   
   This PR does not change this behavior. I noticed, however, that we do not 
have any tests currently to check that -0.0 is normalized and grouped as you 
describe, so I went ahead and added such a test in 
2bfc60548db2c41f4c64b63d40a2652cb22732ab.
   
   Does this address your concern? Or are you suggesting that we should 
normalize -0.0 to 0.0 across the board?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-07 Thread via GitHub


nchammas commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1482085088


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   > This is a bit tricky and it's better if we can find a reference system 
that defines this semantic.
   
   ```scala
   scala> import java.util.HashSet
   import java.util.HashSet
   
   scala> val h = new HashSet[Double]()
   val h: java.util.HashSet[Double] = []
   
   scala> h.add(0.0)
   val res0: Boolean = true
   
   scala> h.add(-0.0)
   val res1: Boolean = true
   
   scala> h.size()
   val res2: Int = 2
   ```
   
   The doc for [HashSet.add][1] states:
   
   > More formally, adds the specified element e to this set if this set 
contains no element e2 such that Objects.equals(e, e2). If this set already 
contains the element, the call leaves the set unchanged and returns false.
   
   In other words, `java.util.HashSet` uses `equals` and not `==`, and 
therefore it considers `0.0` and `-0.0` distinct elements.
   
   So this PR brings `OpenHashSet` more in line with the semantics of 
`java.util.HashSet`.
   
   [1]: 
https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/HashSet.html#add(E)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


MaxGekk commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1482042611


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   I have updated the SQL migration guide.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on PR #45060:
URL: https://github.com/apache/spark/pull/45060#issuecomment-1932751890

   `core` passed. 
   https://github.com/apache/spark/assets/9700541/8c84c5cf-1ecf-4397-9aa8-9f748424607d;>
   
   Could you review this when you have some time, @sunchao ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]

2024-02-07 Thread via GitHub


dongjoon-hyun commented on code in PR #45057:
URL: https://github.com/apache/spark/pull/45057#discussion_r1481997397


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2339,7 +2339,8 @@ object SQLConf {
   .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and 
" +
 "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to 
specify the first " +
 "argument, the first argument should always reference by \"1$\" when 
use argument index " +
-"to indicating the position of the argument in the argument list.")
+"to indicating the position of the argument in the argument list. " +
+"This config will be removed in the future releases.")

Review Comment:
   +1 for the deprecation proposal. Maybe, could you add a proper warning to 
the SQL migration guide, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481977985


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##
@@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils {
 class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
   with Logging {
   @transient private var _countState: ValueState[Long] = _
-  @transient var _processorHandle: StatefulProcessorHandle = _
-
-  override def init(
-  handle: StatefulProcessorHandle,
-  outputMode: OutputMode) : Unit = {
-_processorHandle = handle
-assert(handle.getQueryInfo().getBatchId >= 0)
-_countState = _processorHandle.getValueState[Long]("countState")
+
+  override def init(outputMode: OutputMode): Unit = {

Review Comment:
   Can we also add a test for verifying that the context is not available if 
its not initialized ? Maybe within `ValueStateSuite` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]

2024-02-07 Thread via GitHub


dtenedor commented on code in PR #45007:
URL: https://github.com/apache/spark/pull/45007#discussion_r1481977258


##
python/pyspark/sql/worker/analyze_udtf.py:
##
@@ -31,7 +31,7 @@
 write_with_length,
 SpecialLengths,
 )
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import OrderingColumn, PartitioningColumn, 
SelectedColumn

Review Comment:
   Fixed this for now by removing `OrderingColumn`. I will add a similar check 
for that in a separate PR to help separate concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481976306


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental}
  */
 @Experimental
 @Evolving
-private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
+private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {

Review Comment:
   I believe trait can have default/concrete implementation too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


ericm-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481973732


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental}
  */
 @Experimental
 @Evolving
-private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
+private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable {

Review Comment:
   We want to provide implementation for the getters and setters, so it can no 
longer be a trait, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]

2024-02-07 Thread via GitHub


anishshri-db commented on code in PR #45002:
URL: https://github.com/apache/spark/pull/45002#discussion_r1481971683


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##
@@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends 
Serializable {
* any cleanup or teardown operations.
*/
   def close (): Unit
+
+  /**
+   * Function to set the stateful processor handle that will be used to 
interact with the state
+   * store and other stateful processor related operations.
+   * @param handle - instance of StatefulProcessorHandle
+   */
+  final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit 
= {
+statefulProcessorHandle = handle
+  }
+
+  /**
+   * Function to get the stateful processor handle that will be used to 
interact with the state
+   * @return handle - instance of StatefulProcessorHandle
+   */
+  final def getStatefulProcessorHandle: StatefulProcessorHandle = {
+statefulProcessorHandle

Review Comment:
   Lets set the initial value to null. And if its null, then lets raise an 
exception/error to say that handle is not available ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >