Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
zhengruifeng commented on PR #45056: URL: https://github.com/apache/spark/pull/45056#issuecomment-1933492153 thank you guys, merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
zhengruifeng closed pull request #45056: [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields URL: https://github.com/apache/spark/pull/45056 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46615][CONNECT] Support s.c.immutable.ArraySeq in ArrowDeserializers [spark]
LuciferYang commented on PR #44618: URL: https://github.com/apache/spark/pull/44618#issuecomment-1933461858 Merged into master for Spark 4.0. Thanks @panbingkun @srowen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46615][CONNECT] Support s.c.immutable.ArraySeq in ArrowDeserializers [spark]
LuciferYang closed pull request #44618: [SPARK-46615][CONNECT] Support s.c.immutable.ArraySeq in ArrowDeserializers URL: https://github.com/apache/spark/pull/44618 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46971][SQL] When the `compression` is null, a `NullPointException` should not be thrown [spark]
LuciferYang commented on code in PR #45015: URL: https://github.com/apache/spark/pull/45015#discussion_r1482479360 ## connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala: ## @@ -118,7 +119,11 @@ private[sql] class AvroOptions( * taken into account. If the former one is not set too, the `snappy` codec is used by default. */ val compression: String = { -parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec) +val v = parameters.get(COMPRESSION).getOrElse(SQLConf.get.avroCompressionCodec) +if (v == null) { Review Comment: Would there be compatibility issues if null values are prohibited in `CaseInsensitiveMap`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]
dongjoon-hyun commented on PR #45055: URL: https://github.com/apache/spark/pull/45055#issuecomment-1933455410 Merged to master for Apache Spark 4.0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]
dongjoon-hyun closed pull request #45055: [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default URL: https://github.com/apache/spark/pull/45055 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]
dongjoon-hyun commented on PR #45055: URL: https://github.com/apache/spark/pull/45055#issuecomment-1933452966 Thank you, @LuciferYang ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again [spark]
LuciferYang commented on PR #45017: URL: https://github.com/apache/spark/pull/45017#issuecomment-1933447367 Merged into branch-3.5. Thanks @panbingkun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again [spark]
LuciferYang closed pull request #45017: [SPARK-46400][CORE][SQL][3.5] When there are corrupted files in the local maven repo, skip this cache and try again URL: https://github.com/apache/spark/pull/45017 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]
HyukjinKwon commented on PR #45067: URL: https://github.com/apache/spark/pull/45067#issuecomment-1933434170 Thanks guys! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]
LuciferYang commented on PR #45067: URL: https://github.com/apache/spark/pull/45067#issuecomment-1933429173 Merged into master for Spark 4.0. Thanks @HyukjinKwon @WeichenXu123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]
LuciferYang closed pull request #45067: [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing URL: https://github.com/apache/spark/pull/45067 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]
LuciferYang commented on PR #45067: URL: https://github.com/apache/spark/pull/45067#issuecomment-1933428090 ![image](https://github.com/apache/spark/assets/1475305/8083ab3f-5466-4636-9bf9-a2c3ed5ae9c1) All test passed, I will merging this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]
LuciferYang commented on PR #45066: URL: https://github.com/apache/spark/pull/45066#issuecomment-1933420406 Merged into master for Spark 4.0. Thanks @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]
LuciferYang closed pull request #45066: [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` URL: https://github.com/apache/spark/pull/45066 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46993][SQL] Fix constant folding for session variables [spark]
cloud-fan commented on PR #45059: URL: https://github.com/apache/spark/pull/45059#issuecomment-1933410098 seems the code style check failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46993][SQL] Fix constant folding for session variables [spark]
srielau commented on code in PR #45059: URL: https://github.com/apache/spark/pull/45059#discussion_r1482418715 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala: ## @@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { }.map(e => Alias(e, nameParts.last)()) } -e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) { - case u: UnresolvedAttribute => -resolve(u.nameParts).getOrElse(u) - // Re-resolves `TempResolvedColumn` as variable references if it has tried to be resolved with - // Aggregate but failed. - case t: TempResolvedColumn if t.hasTried => -resolve(t.nameParts).getOrElse(t) +def innerResolve(e: Expression, isTopLevel: Boolean): Expression = withOrigin(e.origin) { + if (e.resolved) return e + val resolved = e match { +case u @ UnresolvedAttribute(nameParts) => + val result = withPosition(u) { +resolve(nameParts).getOrElse(u) match { + // We trim unnecessary alias here. Note that, we cannot trim the alias at top-level, + // as we should resolve `UnresolvedAttribute` to a named expression. The caller side Review Comment: Why not? It is an UnresolvedAttribute. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [MINOR][PYTHON][SQL][TESTS] Don't load Python Data Source when Python executable is not available even for testing [spark]
HyukjinKwon opened a new pull request, #45067: URL: https://github.com/apache/spark/pull/45067 ### What changes were proposed in this pull request? This PR proposes to don't load Python Data Source Python executable is not available even for testing ### Why are the changes needed? Whether if we're in test or not, it can't work loading Python Data Sources anyway. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]
dongjoon-hyun commented on PR #45055: URL: https://github.com/apache/spark/pull/45055#issuecomment-1933307259 Could you review this PR, @LuciferYang ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
zhengruifeng commented on code in PR #45056: URL: https://github.com/apache/spark/pull/45056#discussion_r1482368948 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala: ## @@ -42,7 +42,17 @@ private[connect] object ProtoUtils { val size = string.length val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) if (size > threshold) { - builder.setField(field, createString(string.take(threshold), size)) + builder.setField(field, truncateString(string, threshold)) +} + + case (field: FieldDescriptor, strings: java.lang.Iterable[_]) + if field.getJavaType == FieldDescriptor.JavaType.STRING && field.isRepeated +&& strings != null => +val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) +strings.iterator().asScala.zipWithIndex.foreach { + case (string: String, i) if string != null && string.length > threshold => Review Comment: thanks for checking this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47005][PYTHON][DOCS] Refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` [spark]
LuciferYang opened a new pull request, #45066: URL: https://github.com/apache/spark/pull/45066 ### What changes were proposed in this pull request? This pr refine docstring of `asc_nulls_first/asc_nulls_last/desc_nulls_first/desc_nulls_last` and add some new examples. ### Why are the changes needed? To improve PySpark documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write [spark]
HeartSaVioR closed pull request #45049: [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write URL: https://github.com/apache/spark/pull/45049 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46994][PYTHON] Refactor PythonWrite to prepare for supporting python data source streaming write [spark]
HeartSaVioR commented on PR #45049: URL: https://github.com/apache/spark/pull/45049#issuecomment-1933299295 Thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR closed pull request #44884: [SPARK-46865][SS] Add Batch Support for TransformWithState Operator URL: https://github.com/apache/spark/pull/44884 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR commented on PR #44884: URL: https://github.com/apache/spark/pull/44884#issuecomment-1933298214 Thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
LuciferYang commented on code in PR #45056: URL: https://github.com/apache/spark/pull/45056#discussion_r1482339139 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala: ## @@ -42,7 +42,17 @@ private[connect] object ProtoUtils { val size = string.length val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) if (size > threshold) { - builder.setField(field, createString(string.take(threshold), size)) + builder.setField(field, truncateString(string, threshold)) +} + + case (field: FieldDescriptor, strings: java.lang.Iterable[_]) + if field.getJavaType == FieldDescriptor.JavaType.STRING && field.isRepeated +&& strings != null => +val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) +strings.iterator().asScala.zipWithIndex.foreach { + case (string: String, i) if string != null && string.length > threshold => Review Comment: In fact, for repeated fields, null checks exist whether adding a single element or adding a collection of elements, for example: ```java public Builder addColumnNames( java.lang.String value) { if (value == null) { throw new NullPointerException(); } ensureColumnNamesIsMutable(); columnNames_.add(value); bitField0_ |= 0x0004; onChanged(); return this; } // com.google.protobuf.AbstractMessageLite.Builder#addAll(java.lang.Iterable, java.util.List) protected static void addAll(final Iterable values, final List list) { checkNotNull(values); if (values instanceof LazyStringList) { // For StringOrByteStringLists, check the underlying elements to avoid // forcing conversions of ByteStrings to Strings. // TODO: Could we just prohibit nulls in all protobuf lists and get rid of this? Is // if even possible to hit this condition as all protobuf methods check for null first, // right? List lazyValues = ((LazyStringList) values).getUnderlyingElements(); LazyStringList lazyList = (LazyStringList) list; ``` also, I have performed the following checks: - adding a collection of elements with null ``` val names = Seq.range(0, 10).map(i => if (i == 3) null else i.toString * 1024) val drop = proto.Drop.newBuilder().setInput(sql).addAllColumnNames(names.asJava).build() ``` ``` [info] - truncate repeated strings with nulls *** FAILED *** (3 milliseconds) [info] java.lang.NullPointerException: Element at index 3 is null. [info] at com.google.protobuf.AbstractMessageLite$Builder.addAllCheckingNulls(AbstractMessageLite.java:359) [info] at com.google.protobuf.AbstractMessageLite$Builder.addAll(AbstractMessageLite.java:414) [info] at org.apache.spark.connect.proto.Drop$Builder.addAllColumnNames(Drop.java:1240) ``` - add a null element ``` val drop = proto.Drop.newBuilder().setInput(sql).addColumnNames(null).build() ``` ``` [info] - truncate repeated strings with nulls *** FAILED *** (4 milliseconds) [info] java.lang.NullPointerException: [info] at org.apache.spark.connect.proto.Drop$Builder.addColumnNames(Drop.java:1221) ``` As you can see, under normal circumstances, it is impossible to add a null element to the repeated type. So personally, I don't think this null check is necessary, but I don't object to adding it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]
dongjoon-hyun commented on PR #45057: URL: https://github.com/apache/spark/pull/45057#issuecomment-1933262616 Merged to master for Apache Spark 4.0.0. Thank you, @MaxGekk and @cloud-fan . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]
dongjoon-hyun closed pull request #45057: [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` URL: https://github.com/apache/spark/pull/45057 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]
cloud-fan commented on code in PR #43463: URL: https://github.com/apache/spark/pull/43463#discussion_r1482326766 ## sql/core/src/test/resources/test-data/test-archive.har/_index: ## @@ -0,0 +1,7 @@ +%2F dir 1697722622766+493+tigrulya+hadoop 0 0 test.txt test.orc test.parquet test.json test.csv Review Comment: can we clean up the test files a bit? We only test csv now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-39910][SQL] Delegate path qualification to filesystem during DataSource file path globbing [spark]
cloud-fan commented on code in PR #43463: URL: https://github.com/apache/spark/pull/43463#discussion_r1482325524 ## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceSuite.scala: ## @@ -214,4 +216,6 @@ class MockFileSystem extends RawLocalFileSystem { override def globStatus(pathPattern: Path): Array[FileStatus] = { mockGlobResults.getOrElse(pathPattern, Array()) } + + override def getUri: URI = URI.create("mockFs://mockFs/") Review Comment: is this change needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46617][SQL] Create-table-if-not-exists should not silently overwrite existing data-files [spark]
cloud-fan commented on code in PR #44622: URL: https://github.com/apache/spark/pull/44622#discussion_r1482324935 ## sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala: ## @@ -102,14 +102,17 @@ object DataWritingCommand { } /** * When execute CTAS operators, and the location is not empty, throw [[AnalysisException]]. - * For CTAS, the SaveMode is always [[ErrorIfExists]] + * For CTAS, the SaveMode is always [[ErrorIfExists]]. + * For Create-Table-If-Not-Exists, the SaveMode is [[Ignore]]. * * @param tablePath Table location. * @param saveMode Save mode of the table. * @param hadoopConf Configuration. */ def assertEmptyRootPath(tablePath: URI, saveMode: SaveMode, hadoopConf: Configuration): Unit = { Review Comment: how about the Scala/Python APIs? `DataFrameWriter.mode(...).saveAsTable` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
cloud-fan commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482324222 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: Spark's `OpenHashSet` does not have to match `java.util.HashSet`. What matters is the SQL semantic. Can you highlight which functions/operators are using this `OpenHashSet` and what is the impact of this change to the SQL semantic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
zhengruifeng commented on code in PR #45056: URL: https://github.com/apache/spark/pull/45056#discussion_r1482317218 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala: ## @@ -42,7 +42,17 @@ private[connect] object ProtoUtils { val size = string.length val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) if (size > threshold) { - builder.setField(field, createString(string.take(threshold), size)) + builder.setField(field, truncateString(string, threshold)) +} + + case (field: FieldDescriptor, strings: java.lang.Iterable[_]) + if field.getJavaType == FieldDescriptor.JavaType.STRING && field.isRepeated +&& strings != null => +val threshold = thresholds.getOrElse(STRING, MAX_STRING_SIZE) +strings.iterator().asScala.zipWithIndex.foreach { + case (string: String, i) if string != null && string.length > threshold => Review Comment: I guess so, but not very sure. So I added `string != null` to protect from NPE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on PR #44852: URL: https://github.com/apache/spark/pull/44852#issuecomment-1933236068 Hi @HyukjinKwon, thx for your review and comment. The CI on the newest commit got passed, could you help review it again? Thx very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]
xinrong-meng commented on code in PR #45050: URL: https://github.com/apache/spark/pull/45050#discussion_r1482315739 ## python/pyspark/sql/tests/test_udf_profiler.py: ## @@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float: io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_pandas(self): +# FlatMapGroupsInBatchExec +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(pdf): +v = pdf.v +return pdf.assign(v=(v - v.mean()) / v.std()) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_cogroup_apply_in_pandas(self): +# FlatMapCoGroupsInBatchExec +import pandas as pd + +df1 = self.spark.createDataFrame( +[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), (2102, 2, 4.0)], +("time", "id", "v1"), +) +df2 = self.spark.createDataFrame( +[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2") +) + +def asof_join(left, right): +return pd.merge_asof(left, right, on="time", by="id") + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( +asof_join, schema="time int, id int, v1 double, v2 string" +).show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_arrow(self): +# FlatMapGroupsInBatchExec +import pyarrow.compute as pc + +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(table): +v = table.column("v") +norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1)) +return table.set_column(1, "v", norm) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df.groupby("id").applyInArrow(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_cogroup_apply_in_arrow(self): +import pyarrow as pa + +df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1")) +df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2")) + +def summarize(l, r): Review Comment: I was wondering the reason and saw "In some fonts, these characters are indistinguishable from the numerals one and zero. When tempted to use 'l', use 'L' instead." That's good to learn :p -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact
Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]
ueshin commented on code in PR #45050: URL: https://github.com/apache/spark/pull/45050#discussion_r1482312796 ## python/pyspark/sql/tests/test_udf_profiler.py: ## @@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float: io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_pandas(self): +# FlatMapGroupsInBatchExec +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(pdf): +v = pdf.v +return pdf.assign(v=(v - v.mean()) / v.std()) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_cogroup_apply_in_pandas(self): +# FlatMapCoGroupsInBatchExec +import pandas as pd + +df1 = self.spark.createDataFrame( +[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), (2102, 2, 4.0)], +("time", "id", "v1"), +) +df2 = self.spark.createDataFrame( +[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2") +) + +def asof_join(left, right): +return pd.merge_asof(left, right, on="time", by="id") + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( +asof_join, schema="time int, id int, v1 double, v2 string" +).show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_arrow(self): +# FlatMapGroupsInBatchExec +import pyarrow.compute as pc + +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(table): +v = table.column("v") +norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1)) +return table.set_column(1, "v", norm) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df.groupby("id").applyInArrow(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_cogroup_apply_in_arrow(self): +import pyarrow as pa + +df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1")) +df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2")) + +def summarize(l, r): Review Comment: Shall we rename to `left` and `right` according to the lint error? ## python/pyspark/sql/tests/test_udf_profiler.py: ## @@ -394,6 +394,129 @@ def min_udf(v: pd.Series) -> float: io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" ) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def
Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]
ueshin commented on code in PR #45050: URL: https://github.com/apache/spark/pull/45050#discussion_r1482312940 ## python/pyspark/tests/test_memory_profiler.py: ## @@ -441,6 +441,129 @@ def min_udf(v: pd.Series) -> float: io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" ) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_memory_profiler_group_apply_in_pandas(self): +# FlatMapGroupsInBatchExec +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(pdf): +v = pdf.v +return pdf.assign(v=(v - v.mean()) / v.std()) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): +df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showMemoryProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_memory_profiler_cogroup_apply_in_pandas(self): +# FlatMapCoGroupsInBatchExec +import pandas as pd + +df1 = self.spark.createDataFrame( +[(2101, 1, 1.0), (2101, 2, 2.0), (2102, 1, 3.0), (2102, 2, 4.0)], +("time", "id", "v1"), +) +df2 = self.spark.createDataFrame( +[(2101, 1, "x"), (2101, 2, "y")], ("time", "id", "v2") +) + +def asof_join(left, right): +return pd.merge_asof(left, right, on="time", by="id") + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): +df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas( +asof_join, schema="time int, id int, v1 double, v2 string" +).show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showMemoryProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_memory_profiler_group_apply_in_arrow(self): +# FlatMapGroupsInBatchExec +import pyarrow.compute as pc + +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(table): +v = table.column("v") +norm = pc.divide(pc.subtract(v, pc.mean(v)), pc.stddev(v, ddof=1)) +return table.set_column(1, "v", norm) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "memory"}): +df.groupby("id").applyInArrow(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showMemoryProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_memory_profiler_cogroup_apply_in_arrow(self): +import pyarrow as pa + +df1 = self.spark.createDataFrame([(1, 1.0), (2, 2.0), (1, 3.0), (2, 4.0)], ("id", "v1")) +df2 = self.spark.createDataFrame([(1, "x"), (2, "y")], ("id", "v2")) + +def summarize(l, r): Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail:
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
wbo4958 commented on code in PR #44690: URL: https://github.com/apache/spark/pull/44690#discussion_r1482309634 ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: Hi @srowen. Sure, let me have a PR using BD for master branch. Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
srowen commented on code in PR #44690: URL: https://github.com/apache/spark/pull/44690#discussion_r1482301112 ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: I know. Above I give an example where multiplying by long works. I'm referring to your example of BigDecimal above, which does not use (only) BigDecimal. Please just try it. ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: I know. Above I give an example where multiplying by long doesn't work. I'm referring to your example of BigDecimal above, which does not use (only) BigDecimal. Please just try it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
wbo4958 commented on code in PR #44690: URL: https://github.com/apache/spark/pull/44690#discussion_r1482298562 ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: Hi @srowen. Actually, this PR converts the taskAmount which is a double/float value to a Long by multiplying `1E16.toLong`, and then the following calculation is based on Long instead of double/float. you can see, all the APIs of [ExecutorResourcesAmount](https://github.com/apache/spark/pull/44690/files#diff-91f559d0e4aeff4052cd7c6575d8622d911ecad0852040f64ccc1d3abfd605faR41) are using Long. Even the assigned resource of a task is still keeping the Long, you can refer to [there](https://github.com/apache/spark/pull/44690/files#diff-381b69941735d860cddcbb3b565abfef591a890aadecbc235283f3a57de9ba61R62). Yeah, but you think we should use BigDecimal, I'm Okay for that, I can make a PR for master branch first, and then cherry-pick to 3.5 branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]
cloud-fan commented on code in PR #45059: URL: https://github.com/apache/spark/pull/45059#discussion_r1482291904 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala: ## @@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { }.map(e => Alias(e, nameParts.last)()) } -e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) { - case u: UnresolvedAttribute => -resolve(u.nameParts).getOrElse(u) - // Re-resolves `TempResolvedColumn` as variable references if it has tried to be resolved with - // Aggregate but failed. - case t: TempResolvedColumn if t.hasTried => -resolve(t.nameParts).getOrElse(t) +def innerResolve(e: Expression, isTopLevel: Boolean): Expression = withOrigin(e.origin) { + if (e.resolved) return e + val resolved = e match { +case u @ UnresolvedAttribute(nameParts) => + val result = withPosition(u) { +resolve(nameParts).getOrElse(u) match { + // We trim unnecessary alias here. Note that, we cannot trim the alias at top-level, + // as we should resolve `UnresolvedAttribute` to a named expression. The caller side + // can trim the top-level alias if it's safe to do so. Since we will call + // CleanupAliases later in Analyzer, trim non top-level unnecessary alias is safe. + case Alias(child, _) if !isTopLevel => child + case other => other +} + } + result + +// Re-resolves `TempResolvedColumn` if it has tried to be resolved with Aggregate Review Comment: nit: let's keep the previous comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]
cloud-fan commented on code in PR #45059: URL: https://github.com/apache/spark/pull/45059#discussion_r1482291771 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala: ## @@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { }.map(e => Alias(e, nameParts.last)()) } -e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) { - case u: UnresolvedAttribute => -resolve(u.nameParts).getOrElse(u) - // Re-resolves `TempResolvedColumn` as variable references if it has tried to be resolved with - // Aggregate but failed. - case t: TempResolvedColumn if t.hasTried => -resolve(t.nameParts).getOrElse(t) +def innerResolve(e: Expression, isTopLevel: Boolean): Expression = withOrigin(e.origin) { + if (e.resolved) return e + val resolved = e match { +case u @ UnresolvedAttribute(nameParts) => + val result = withPosition(u) { +resolve(nameParts).getOrElse(u) match { + // We trim unnecessary alias here. Note that, we cannot trim the alias at top-level, + // as we should resolve `UnresolvedAttribute` to a named expression. The caller side Review Comment: nit: update the comment to not mention UnresolvedAttribute here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46993] Fix constant folding for session variables [spark]
cloud-fan commented on code in PR #45059: URL: https://github.com/apache/spark/pull/45059#discussion_r1482291388 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala: ## @@ -312,14 +312,39 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { }.map(e => Alias(e, nameParts.last)()) } -e.transformWithPruning(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) { - case u: UnresolvedAttribute => -resolve(u.nameParts).getOrElse(u) - // Re-resolves `TempResolvedColumn` as variable references if it has tried to be resolved with - // Aggregate but failed. - case t: TempResolvedColumn if t.hasTried => -resolve(t.nameParts).getOrElse(t) +def innerResolve(e: Expression, isTopLevel: Boolean): Expression = withOrigin(e.origin) { + if (e.resolved) return e Review Comment: to be consistent with the previous code, we should add one more shortcut: ``` if (!e.containsAnyPattern(UNRESOLVED_ATTRIBUTE, TEMP_RESOLVED_COLUMN)) return e ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
revans2 commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482265980 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { +// Therefore, 0.0 and -0.0 should get separate entries in the hash set. +// +// Exactly these elements provided in roughly this order will trigger the following scenario: +// When probing the bitset in `getPos(-0.0)`, the loop will happen upon the entry for 0.0. +// In the old logic pre-SPARK-45599, the loop will find that the bit is set and, because +// -0.0 == 0.0, it will think that's the position of -0.0. But in reality this is the position +// of 0.0. So -0.0 and 0.0 will be stored at different positions, but `getPos()` will return +// the same position for them. This can cause users of OpenHashSet, like OpenHashMap, to +// return the wrong value for a key based on whether or not this bitset lookup collision +// happens. Review Comment: I found it because I was essentially running fuzz testing comparing https://github.com/NVIDIA/spark-rapids to the gold standard Apache Spark. Most failures that we find end up being issues with the RAPIDs Accelerator for Apache Spark, but every so often I find something that ends up being wrong with Spark, like in this case. So yes it was just pure luck that we found it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42304][FOLLOWUP][SQL] Add test for `GET_TABLES_BY_TYPE_UNSUPPORTED_BY_HIVE_VERSION` [spark]
github-actions[bot] commented on PR #42884: URL: https://github.com/apache/spark/pull/42884#issuecomment-1933156917 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1482258013 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,97 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): Review Comment: Great suggestion, I have copied corresponding method comments from scala API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]
dtenedor commented on PR #45062: URL: https://github.com/apache/spark/pull/45062#issuecomment-1933137062 cc @ueshin here is the follow-up PR to add more checks for `analyze` result ordering fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482237064 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala: ## @@ -249,4 +249,34 @@ class OpenHashMapSuite extends SparkFunSuite with Matchers { map(null) = null assert(map.get(null) === Some(null)) } + + test("SPARK-45599: 0.0 and -0.0 should count distinctly") { +// Exactly these elements provided in roughly this order trigger a condition where lookups of +// 0.0 and -0.0 in the bitset happen to collide, causing their counts to be merged incorrectly +// and inconsistently if `==` is used to check for key equality. +val spark45599Repro = Seq( + Double.NaN, + 2.0, + 168.0, + Double.NaN, + Double.NaN, + -0.0, + 153.0, + 0.0 +) + +val map1 = new OpenHashMap[Double, Int]() +spark45599Repro.foreach(map1.changeValue(_, 1, {_ + 1})) +map1.iterator.foreach(println) +assert(map1(0.0) == 1) +assert(map1(-0.0) == 1) + +val map2 = new OpenHashMap[Double, Int]() +// Simply changing the order in which the elements are added to the map should not change the +// counts for 0.0 and -0.0. +spark45599Repro.reverse.foreach(map2.changeValue(_, 1, {_ + 1})) +map2.iterator.foreach(println) +assert(map2(0.0) == 1) +assert(map2(-0.0) == 1) Review Comment: This is another expression of the same bug that this PR addresses. If you run this test on `master`, you will see that the counts for 0.0 and -0.0 depend on the order in which the elements from `spark45599Repro` are added to the map. ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,42 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { +// Therefore, 0.0 and -0.0 should get separate entries in the hash set. +// +// Exactly these elements provided in roughly this order will trigger the following scenario: +// When probing the bitset in `getPos(-0.0)`, the loop will happen upon the entry for 0.0. +// In the old logic pre-SPARK-45599, the loop will find that the bit is set and, because +// -0.0 == 0.0, it will think that's the position of -0.0. But in reality this is the position +// of 0.0. So -0.0 and 0.0 will be stored at different positions, but `getPos()` will return +// the same position for them. This can cause users of OpenHashSet, like OpenHashMap, to +// return the wrong value for a key based on whether or not this bitset lookup collision +// happens. Review Comment: It really is amazing that @revans2 found this bug, because it depends on the set being a specific size and the 0.0 and -0.0 being inserted and then looked up in just the right order so that they happen to collide in the bitset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on PR #45023: URL: https://github.com/apache/spark/pull/45023#issuecomment-1933104236 As suggested by @HeartSaVioR , I bring up a reference PR that contains the full change and end2end example of python streaming data source https://github.com/apache/spark/pull/45065/files#diff-aac0c53cf10992b58e7862115c9f72b1cb5b086a39b4e5b11543ac99d234f761R122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1482218173 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { Review Comment: Comments added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1482219079 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala: ## @@ -45,13 +63,50 @@ class PythonScan( new PythonPartitionReaderFactory( ds.source, readerFunc, outputSchema, jobArtifactUUID) } +} - override def toBatch: Batch = this +case class PythonStreamingSourceOffset(json: String) extends Offset - override def description: String = "(Python)" +case class PythonStreamingSourcePartition(partition: Array[Byte]) extends InputPartition - override def readSchema(): StructType = outputSchema +class PythonMicroBatchStream( Review Comment: Moved to a separate file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1482217627 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val partitionsFuncId = 886 + val latestOffsetsFuncId = 887 +} + +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for streaming functions. Sets up Spark Connect session Review Comment: sorry, the code was copied from else where and I forgot to change this line of comment, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1482216213 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val partitionsFuncId = 886 + val latestOffsetsFuncId = 887 Review Comment: They are sent to python process to invoke the corresponding function. I also wonder whether is there better alternative than hardcoding the number in both scala and python code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on PR #45061: URL: https://github.com/apache/spark/pull/45061#issuecomment-1933073565 Merged to master for Apache Spark 4.0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun closed pull request #45061: [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s URL: https://github.com/apache/spark/pull/45061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on PR #45061: URL: https://github.com/apache/spark/pull/45061#issuecomment-1933068990 Thank you, @viirya ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482191800 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Spark don't know the default values. It's controlled by K8s system admin. > Hmm, if the goal is to limit the value to be larger than the default value, why don't we check against it but a value like 1KB? Yes, if the user has a `unit` like KB, this PR will not block. > What will happen if user specify 10KB, for example? It will be used or 8GB (default value) will be used? And, as I mentioned in the above, https://github.com/apache/spark/pull/45061#discussion_r1482183784, if a user uses 10240, we will not complain because it's too big as a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [REFERENCE][DO-NOT-MERGE] Initial implementation of python streaminng data source [spark]
chaoqin-li1123 opened a new pull request, #45065: URL: https://github.com/apache/spark/pull/45065 ### What changes were proposed in this pull request? This is a reference PR that contain all changes for supporting streaming python data source API. ### Why are the changes needed? Implement python streaming data source ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test and integration test. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482189060 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Hmm, if the goal is to limit the value to be larger than the default value, why don't we check against it but a value like 1KB? What will happen if user specify 10KB, for example? It will be used or 8GB (default value) will be used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482188354 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: The previous behavior was not what the user's intention at all. And, the job fails after many executor losses as described in the PR description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482187670 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Yes, right~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Oh, you mean, previously once user specify a value less than 1KB, 8GB (or other default value) will be used. But now the PR makes it failed immediately. ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Oh, you mean, previously once user specify a value like 1KB, 8GB (or other default value) will be used. But now the PR makes it failed immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482186467 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Oh, you mean, previously once user specify a value like 1KB, 8GB (or other default value) will be used. But now the PR makes it failed immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482185167 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: What I meant is the the previous behavior is 8GiB (or other default value). ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: What I meant was the the previous behavior is 8GiB (or other default value). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482184736 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Oh, no~ This PR aims to fail the job from the beginning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482183654 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Oh, you mean if `verifySize` fails, 8GB (or other default value) will be used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482183784 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Of course, a user can make a mistake like 10240, but this PR aims to handle the most common cases which we are sure that this is a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482182475 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: No~ In this case, this value is ignored. Instead, the created volume size is determined by underlying K8s configuration. For example, 8GiB in some K8s system. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482182370 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Should we mention in some place (any doc?) that the minimum volume size is 1KB? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
viirya commented on code in PR #45061: URL: https://github.com/apache/spark/pull/45061#discussion_r1482181478 ## resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala: ## @@ -105,4 +109,13 @@ private[spark] object KubernetesVolumeUtils { throw new NoSuchElementException(key + s" is required for $msg") } } + + private def verifySize(size: Option[String]): Unit = { +size.foreach { v => + if (v.forall(_.isDigit) && parseLong(v) < 1024) { +throw new IllegalArgumentException( + s"Volume size `$v` is smaller than 1KiB. Missing units?") + } +} Review Comment: Is 1KB the minimum size we can specify for volume? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun commented on PR #45061: URL: https://github.com/apache/spark/pull/45061#issuecomment-1933048949 All tests passed. Could you review this PR, @viirya ? https://github.com/apache/spark/assets/9700541/8b1bde03-7eed-46b9-a4cf-e63327c39fa5;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]
dongjoon-hyun commented on PR #45060: URL: https://github.com/apache/spark/pull/45060#issuecomment-1933047669 All tests passed. Merged to master for Apache Spark 4.0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]
dongjoon-hyun closed pull request #45060: [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` URL: https://github.com/apache/spark/pull/45060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
dbatomic opened a new pull request, #45064: URL: https://github.com/apache/spark/pull/45064 ### What changes were proposed in this pull request? This PR adds E2E support for `collate` and `collation` expressions. Following changes were made to get us there: 1) Set the right ordering for `PhysicalStringType` based on `collationId`. 2) UTF8String is now just a data holder class - it no longer implements `Comparable` interface. All comparisons must be done through `CollationFactory`. 3) `collate` and `collation` expressions are added. Special syntax for `collate` is enabled - `'hello world' COLLATE 'target_collation' 4) First set of tests is added that covers both core expression and E2E collation tests. ### Why are the changes needed? This PR is part of larger collation track. For more details please refer to design doc attached in parent JIRA ticket. ### Does this PR introduce _any_ user-facing change? This test adds two new expressions and opens up new syntax. ### How was this patch tested? Basic tests are added. In follow up PRs we will add support for more advanced operators and keep adding tests alongside new feature support. ### Was this patch authored or co-authored using generative AI tooling? Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47004] Added more tests to ClientStreamingQuerySuite to increase Scala client test coverage [spark]
bogao007 opened a new pull request, #45063: URL: https://github.com/apache/spark/pull/45063 ### What changes were proposed in this pull request? Added more tests to ClientStreamingQuerySuite to increase Scala client test coverage ### Why are the changes needed? To increase test coverage for Streaming Spark Connect Scala client ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is a test-only change. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47002][Python] Return better error message if UDTF 'analyze' method 'orderBy' field accidentally returns a list of strings [spark]
dtenedor opened a new pull request, #45062: URL: https://github.com/apache/spark/pull/45062 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas/applyInArrow [spark]
xinrong-meng commented on code in PR #45050: URL: https://github.com/apache/spark/pull/45050#discussion_r1482146994 ## python/pyspark/sql/tests/test_udf_profiler.py: ## @@ -333,6 +333,69 @@ def filter_func(iterator): self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_pandas(self): Review Comment: Good catch! ## python/pyspark/sql/tests/test_udf_profiler.py: ## @@ -333,6 +333,69 @@ def filter_func(iterator): self.assertEqual(0, len(self.profile_results), str(self.profile_results.keys())) +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_group_apply_in_pandas(self): +# FlatMapGroupsInBatchExec +df = self.spark.createDataFrame( +[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v") +) + +def normalize(pdf): +v = pdf.v +return pdf.assign(v=(v - v.mean()) / v.std()) + +with self.sql_conf({"spark.sql.pyspark.udf.profiler": "perf"}): +df.groupby("id").applyInPandas(normalize, schema="id long, v double").show() + +self.assertEqual(1, len(self.profile_results), str(self.profile_results.keys())) + +for id in self.profile_results: +with self.trap_stdout() as io: +self.spark.showPerfProfiles(id) + +self.assertIn(f"Profile of UDF", io.getvalue()) +self.assertRegex( +io.getvalue(), f"2.*{os.path.basename(inspect.getfile(_do_computation))}" +) + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +def test_perf_profiler_cogroup_apply_in_pandas(self): Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: Consider another interesting case where `java.util.HashSet` and `OpenHashSet` differ: ```scala scala> val h = new HashSet[Double]() val h: java.util.HashSet[Double] = [] scala> h.add(Double.NaN) val res9: Boolean = true scala> h.add(Double.NaN) val res10: Boolean = false scala> h.contains(Double.NaN) val res11: Boolean = true scala> h.size() val res12: Int = 1 ``` On `master`, `OpenHashSet` does something obviously wrong: ```scala val set = new OpenHashSet[Double]() set.add(Double.NaN) set.add(Double.NaN) set.size // returns 2 set.contains(Double.NaN) // returns false ``` This could possibly lead to a bug like the one reported in SPARK-45599 but in reverse, where a new NaN row is added rather than dropped. I will see if I can construct such a scenario as a demonstration. But regardless, I think this behavior is incorrect by itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]
ueshin closed pull request #45035: [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs URL: https://github.com/apache/spark/pull/45035 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]
ueshin commented on PR #45035: URL: https://github.com/apache/spark/pull/45035#issuecomment-1932995660 Thanks! merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]
ueshin closed pull request #45007: [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select URL: https://github.com/apache/spark/pull/45007 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]
ueshin commented on PR #45007: URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985366 Thanks! merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]
ueshin commented on PR #45007: URL: https://github.com/apache/spark/pull/45007#issuecomment-1932985032 The failed tests seem not related to the last commit. The previous build passed except for linter that is fixed in the last commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]
dongjoon-hyun commented on PR #45060: URL: https://github.com/apache/spark/pull/45060#issuecomment-1932978043 Thank you , @sunchao ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482125925 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: Note also that the docstring for `OpenHashSet` seems to imply that it is meant to be a faster but semantically equivalent alternative to `java.util.HashSet`: https://github.com/apache/spark/blob/0d5c2ce427e06adfebf47bc446ff6646513ae750/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala#L31-L32 If that's true, then we should perhaps add property based tests to ensure alignment between the two implementations, but I'll leave that as a potential future improvement. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482123330 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: Consider another interesting case where `java.util.HashSet` and `OpenHashSet` differ: ```scala scala> val h = new HashSet[Double]() val h: java.util.HashSet[Double] = [] scala> h.add(Double.NaN) val res9: Boolean = true scala> h.add(Double.NaN) val res10: Boolean = false scala> h.size() val res11: Int = 1 ``` On `master`, `OpenHashSet` does IMO the wrong thing: ```scala val set = new OpenHashSet[Double]() set.add(Double.NaN) set.add(Double.NaN) set.size // returns 2 ``` This could possibly lead to a bug like the one reported in SPARK-45599 but in reverse, where a new NaN row is added rather than dropped. I will see if I can construct such a scenario as a demonstration. But regardless, I think this behavior is incorrect by itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47003][K8S] Detect and fail on invalid volume sizes (< 1KiB) in K8s [spark]
dongjoon-hyun opened a new pull request, #45061: URL: https://github.com/apache/spark/pull/45061 … ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1482110798 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils { class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (String, String)] with Logging { @transient private var _countState: ValueState[Long] = _ - @transient var _processorHandle: StatefulProcessorHandle = _ - - override def init( - handle: StatefulProcessorHandle, - outputMode: OutputMode) : Unit = { -_processorHandle = handle -assert(handle.getQueryInfo().getBatchId >= 0) -_countState = _processorHandle.getValueState[Long]("countState") + + override def init(outputMode: OutputMode): Unit = { Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][DOCS] Show sort order of NaN relative to infinity [spark]
nchammas commented on PR #45047: URL: https://github.com/apache/spark/pull/45047#issuecomment-1932909805 I don't think we have an actual unit test to check this sort behavior, so I added one as part of #45036 (which also verifies some other behavior we are discussing there). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482089432 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: > In Spark, 0.0 == -0.0, and in GROUP BY, 0.0 and -0.0 are considered to be in the same group and normalized to 0.0. This PR does not change this behavior. I noticed, however, that we do not have any tests currently to check that -0.0 is normalized and grouped as you describe, so I went ahead and added such a test in 2bfc60548db2c41f4c64b63d40a2652cb22732ab. Does this address your concern? Or are you suggesting that we should normalize -0.0 to 0.0 across the board? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
nchammas commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1482085088 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: > This is a bit tricky and it's better if we can find a reference system that defines this semantic. ```scala scala> import java.util.HashSet import java.util.HashSet scala> val h = new HashSet[Double]() val h: java.util.HashSet[Double] = [] scala> h.add(0.0) val res0: Boolean = true scala> h.add(-0.0) val res1: Boolean = true scala> h.size() val res2: Int = 2 ``` The doc for [HashSet.add][1] states: > More formally, adds the specified element e to this set if this set contains no element e2 such that Objects.equals(e, e2). If this set already contains the element, the call leaves the set unchanged and returns false. In other words, `java.util.HashSet` uses `equals` and not `==`, and therefore it considers `0.0` and `-0.0` distinct elements. So this PR brings `OpenHashSet` more in line with the semantics of `java.util.HashSet`. [1]: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/HashSet.html#add(E) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]
MaxGekk commented on code in PR #45057: URL: https://github.com/apache/spark/pull/45057#discussion_r1482042611 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2339,7 +2339,8 @@ object SQLConf { .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and " + "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to specify the first " + "argument, the first argument should always reference by \"1$\" when use argument index " + -"to indicating the position of the argument in the argument list.") +"to indicating the position of the argument in the argument list. " + +"This config will be removed in the future releases.") Review Comment: I have updated the SQL migration guide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47000][CORE] Use `getTotalMemorySize` in `WorkerArguments` [spark]
dongjoon-hyun commented on PR #45060: URL: https://github.com/apache/spark/pull/45060#issuecomment-1932751890 `core` passed. https://github.com/apache/spark/assets/9700541/8c84c5cf-1ecf-4397-9aa8-9f748424607d;> Could you review this when you have some time, @sunchao ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46998][SQL] Deprecate the SQL config `spark.sql.legacy.allowZeroIndexInFormatString` [spark]
dongjoon-hyun commented on code in PR #45057: URL: https://github.com/apache/spark/pull/45057#discussion_r1481997397 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2339,7 +2339,8 @@ object SQLConf { .doc("When false, the `strfmt` in `format_string(strfmt, obj, ...)` and " + "`printf(strfmt, obj, ...)` will no longer support to use \"0$\" to specify the first " + "argument, the first argument should always reference by \"1$\" when use argument index " + -"to indicating the position of the argument in the argument list.") +"to indicating the position of the argument in the argument list. " + +"This config will be removed in the future releases.") Review Comment: +1 for the deprecation proposal. Maybe, could you add a proper warning to the SQL migration guide, too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481977985 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils { class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (String, String)] with Logging { @transient private var _countState: ValueState[Long] = _ - @transient var _processorHandle: StatefulProcessorHandle = _ - - override def init( - handle: StatefulProcessorHandle, - outputMode: OutputMode) : Unit = { -_processorHandle = handle -assert(handle.getQueryInfo().getBatchId >= 0) -_countState = _processorHandle.getValueState[Long]("countState") + + override def init(outputMode: OutputMode): Unit = { Review Comment: Can we also add a test for verifying that the context is not available if its not initialized ? Maybe within `ValueStateSuite` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46966][Python] Add UDTF API for 'analyze' method to indicate subset of input table columns to select [spark]
dtenedor commented on code in PR #45007: URL: https://github.com/apache/spark/pull/45007#discussion_r1481977258 ## python/pyspark/sql/worker/analyze_udtf.py: ## @@ -31,7 +31,7 @@ write_with_length, SpecialLengths, ) -from pyspark.sql.functions import PartitioningColumn +from pyspark.sql.functions import OrderingColumn, PartitioningColumn, SelectedColumn Review Comment: Fixed this for now by removing `OrderingColumn`. I will add a similar check for that in a separate PR to help separate concerns. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481976306 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental} */ @Experimental @Evolving -private[sql] trait StatefulProcessor[K, I, O] extends Serializable { +private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { Review Comment: I believe trait can have default/concrete implementation too ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481973732 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental} */ @Experimental @Evolving -private[sql] trait StatefulProcessor[K, I, O] extends Serializable { +private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { Review Comment: We want to provide implementation for the getters and setters, so it can no longer be a trait, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481971683 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * any cleanup or teardown operations. */ def close (): Unit + + /** + * Function to set the stateful processor handle that will be used to interact with the state + * store and other stateful processor related operations. + * @param handle - instance of StatefulProcessorHandle + */ + final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit = { +statefulProcessorHandle = handle + } + + /** + * Function to get the stateful processor handle that will be used to interact with the state + * @return handle - instance of StatefulProcessorHandle + */ + final def getStatefulProcessorHandle: StatefulProcessorHandle = { +statefulProcessorHandle Review Comment: Lets set the initial value to null. And if its null, then lets raise an exception/error to say that handle is not available ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org