Re: [PR] [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup [spark]
EnricoMi commented on code in PR #38624: URL: https://github.com/apache/spark/pull/38624#discussion_r1384512107 ## python/pyspark/worker.py: ## @@ -306,6 +308,33 @@ def verify_element(elem): ) +def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf): Review Comment: Consistent in the sense of naming or what it does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]
grundprinzip commented on code in PR #43664: URL: https://github.com/apache/spark/pull/43664#discussion_r1384511928 ## python/pyspark/sql/connect/client/core.py: ## @@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str: return self._artifact_manager.cache_artifact(blob) raise SparkConnectException("Invalid state during retry exception handling.") +def _verify_response_integrity( +self, +response: Union[ +pb2.ConfigResponse, +pb2.ExecutePlanResponse, +pb2.InterruptResponse, +pb2.ReleaseExecuteResponse, +pb2.AddArtifactsResponse, +pb2.AnalyzePlanResponse, +pb2.FetchErrorDetailsResponse, +pb2.ReleaseSessionResponse, +], +) -> None: +""" +Verifies the integrity of the response. This method checks if the session ID and the +server-side session ID match. If not, it throws an exception. +Parameters +-- +response - One of the different response types handled by the Spark Connect service +""" +if self._session_id != response.session_id: +raise SparkConnectException( +"Received incorrect session identifier for request:" +f"{response.session_id} != {self._session_id}" +) +if self._server_session_id is not None: +if response.server_side_session_id != self._server_session_id: +raise SparkConnectException( +"Received incorrect server side session identifier for request. " +"Please restart Spark Session. (" Review Comment: I modified it to "Please create a new Spark Session to reconnect". But how exactly the new Spark Session is created really depends on the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
beliefer commented on PR #43693: URL: https://github.com/apache/spark/pull/43693#issuecomment-1797975709 @yaooqinn Thank you for the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
yaooqinn commented on PR #43693: URL: https://github.com/apache/spark/pull/43693#issuecomment-1797943499 Thanks @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45808][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
grundprinzip commented on code in PR #43667: URL: https://github.com/apache/spark/pull/43667#discussion_r1384460826 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala: ## @@ -46,9 +44,7 @@ class SparkConnectFetchErrorDetailsHandler( ErrorUtils.throwableToFetchErrorDetailsResponse( st = error, - serverStackTraceEnabled = sessionHolder.session.conf.get( -Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || sessionHolder.session.conf.get( Review Comment: It's still used, but only verifies the display behavior rather than the stack trace generation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return `NULL` when overflowing during casting from timestamp to integers [spark]
dongjoon-hyun commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384458242 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: Got it. Thank you for checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
dongjoon-hyun commented on PR #43693: URL: https://github.com/apache/spark/pull/43693#issuecomment-1797916547 According to the `Affected Version` of JIRA, I landed to master branch only. Please feel free to backport this if you need. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
dongjoon-hyun closed pull request #43693: [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId URL: https://github.com/apache/spark/pull/43693 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return `NULL` when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384455469 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: As far as I can check, it is current behavior at the beginning (at least in 8 yrs ago commits it is already this behavior). So that's why I wonder if I miss anything. But indeed, it might be rare to cast timestamp to byte/short/int. Maybe it is why this is not caught? 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
dongjoon-hyun commented on PR #43693: URL: https://github.com/apache/spark/pull/43693#issuecomment-1797914973 I verified manually. ``` [info] CoarseGrainedExecutorBackendSuite: [info] - parsing no resources (468 milliseconds) [info] - parsing one resource (27 milliseconds) [info] - parsing multiple resources resource profile (11 milliseconds) [info] - parsing multiple resources (9 milliseconds) [info] - error checking parsing resources and executor and task configs (18 milliseconds) [info] - executor resource found less than required resource profile (9 milliseconds) [info] - executor resource found less than required (8 milliseconds) [info] - use resource discovery (25 milliseconds) [info] - use resource discovery and allocated file option with resource profile (19 milliseconds) [info] - use resource discovery and allocated file option (18 milliseconds) [info] - track allocated resources by taskId (252 milliseconds) [info] - SPARK-24203 when bindAddress is not set, it defaults to hostname (1 millisecond) [info] - SPARK-24203 when bindAddress is different, it does not default to hostname (1 millisecond) [info] - Tasks launched should always be cancelled. (1 second, 358 milliseconds) [info] - Tasks not launched should always be cancelled. (175 milliseconds) [info] - SPARK-40320 Executor should exit when initialization failed for fatal error (2 seconds, 472 milliseconds) [info] Run completed in 5 seconds, 660 milliseconds. [info] Total number of tests run: 16 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 16, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 71 s (01:11), completed Nov 6, 2023, 10:52:27 PM ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return `NULL` when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384455469 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: As far as I can check, it is current behavior at the beginning. So that's why I wonder if I miss anything. But indeed, it might be rare to cast timestamp to byte/short/int. Maybe it is why this is not caught? 🤔 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: As far as I can check, it is current behavior at the beginning. So that's why I wonder if I miss anything. But indeed, it might be rare to cast timestamp to byte/short/int. Maybe it is why this is not caught? 🤔 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SQL] Add a SQL config for extra traces in `Origin` [spark]
MaxGekk opened a new pull request, #43695: URL: https://github.com/apache/spark/pull/43695 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
dongjoon-hyun commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384449877 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: Do you know when the current behavior was introduced, @viirya ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45431][DOCS] Document new SSL RPC feature [spark]
mridulm commented on code in PR #43240: URL: https://github.com/apache/spark/pull/43240#discussion_r1384439623 ## docs/security.md: ## @@ -563,7 +604,52 @@ replaced with one of the above namespaces. ${ns}.trustStoreType JKS -The type of the trust store. +The type of the trust store. This setting is not applicable to the `rpc` namespace. + + +${ns}.openSSLEnabled +false + + Whether to use OpenSSL for cryptographic operations instead of the JDK SSL provider. + This setting is only applicable to the `rpc` namespace, and also requires the `certChain` + and `privateKey` settings to be set. Review Comment: I am referring to (3). If openssl is requested but is not available at runtime, we will try to fallback to jks. The openssl config for `privateKey`/`certChain` need not be compatible with the `certChain`/`privateKey` for jks - for example, openssl for rpc vs jks for ui. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1797888003 @tgravescs The SparkEnv related change is what gave me pause ... I am less concerned about the Executor side of things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384416801 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: While I'm working on related stuff about casting timestamp, I take a look how Spark handles casting timestamp to integer. If no special reason, this behavior doesn't look correct to me and is not common way for overflowing when casting other types under non-ansi mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384424672 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: Let me know if I miss anything here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2 [spark]
panbingkun commented on code in PR #37588: URL: https://github.com/apache/spark/pull/37588#discussion_r1384419375 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala: ## @@ -1090,6 +1090,26 @@ class SessionCatalog( dbViews ++ listLocalTempViews(pattern) } + /** + * List all matching temp views in the specified database, including global/local temporary views. + */ + def listTempViews(db: String, pattern: String): Seq[CatalogTable] = { +val dbName = format(db) +val globalTempViews = if (dbName == globalTempViewManager.database) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384417670 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOffSuite.scala: ## @@ -514,9 +514,9 @@ class CastWithAnsiOffSuite extends CastSuiteBase { val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1") assert(negativeTs.getTime < 0) val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND) - checkEvaluation(cast(negativeTs, ByteType), expectedSecs.toByte) - checkEvaluation(cast(negativeTs, ShortType), expectedSecs.toShort) - checkEvaluation(cast(negativeTs, IntegerType), expectedSecs.toInt) Review Comment: For example, the long value is -2198208304 (i.e., `negativeTs`) but int value is 2096758992. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384417670 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastWithAnsiOffSuite.scala: ## @@ -514,9 +514,9 @@ class CastWithAnsiOffSuite extends CastSuiteBase { val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1") assert(negativeTs.getTime < 0) val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND) - checkEvaluation(cast(negativeTs, ByteType), expectedSecs.toByte) - checkEvaluation(cast(negativeTs, ShortType), expectedSecs.toShort) - checkEvaluation(cast(negativeTs, IntegerType), expectedSecs.toInt) Review Comment: For example, the long value is -2198208304 but int value is 2096758992. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya commented on code in PR #43694: URL: https://github.com/apache/spark/pull/43694#discussion_r1384416801 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -785,17 +786,19 @@ case class Cast( buildCast[Boolean](_, b => if (b) 1 else 0) case DateType => buildCast[Int](_, d => null) -case TimestampType if ansiEnabled => +case TimestampType => buildCast[Long](_, t => { val longValue = timestampToLong(t) if (longValue == longValue.toInt) { longValue.toInt } else { - throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + if (ansiEnabled) { +throw QueryExecutionErrors.castingCauseOverflowError(t, from, IntegerType) + } else { +null + } Review Comment: While I'm working on related stuff, I take a look how Spark handles casting timestamp to integer. If no special reason, this behavior doesn't look correct to me and is not common way for overflowing when casting other types under non-ansi mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45816][SQL] Return null when overflowing during casting from timestamp to integers [spark]
viirya opened a new pull request, #43694: URL: https://github.com/apache/spark/pull/43694 ### What changes were proposed in this pull request? Spark cast works in two modes: ansi and non-ansi. When overflowing during casting, the common behavior under non-ansi mode is to return null. However, casting from Timestamp to Int/Short/Byte returns a wrapping value now. The behavior to silently overflow doesn't make sense. This patch changes it to the common behavior, i.e., returning null. ### Why are the changes needed? Returning a wrapping value, e.g., negative one, during casting Timestamp to Int/Short/Byte could implicitly cause misinterpret casted result without caution. We also should follow the common behavior of overflowing handling. ### Does this PR introduce _any_ user-facing change? Yes. Overflowing during casting from Timestamp to Int/Short/Byte under non-ansi mode, returns null instead of wrapping value. ### How was this patch tested? Will add test or update test if any existing ones fail ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45223][PYTHON][DOCS] Refine docstring of `Column.when` [spark]
dongjoon-hyun commented on PR #43688: URL: https://github.com/apache/spark/pull/43688#issuecomment-1797863219 Could you re-trigger the failed pipeline? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45013][TEST] Flaky Test with NPE: track allocated resources by taskId [spark]
yaooqinn opened a new pull request, #43693: URL: https://github.com/apache/spark/pull/43693 ### What changes were proposed in this pull request? This PR ensures the runningTasks to be updated before subsequent tasks causing NPE ### Why are the changes needed? fix flakey tests ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? It shall fix ``` - track allocated resources by taskId *** FAILED *** (36 milliseconds) [info] java.lang.NullPointerException: Cannot invoke "org.apache.spark.executor.Executor$TaskRunner.taskDescription()" because the return value of "java.util.concurrent.ConcurrentHashMap.get(Object)" is null [info] at org.apache.spark.executor.CoarseGrainedExecutorBackend.statusUpdate(CoarseGrainedExecutorBackend.scala:275) [info] at org.apache.spark.executor.CoarseGrainedExecutorBackendSuite.$anonfun$new$22(CoarseGrainedExecutorBackendSuite.scala:351) [info] ``` ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45511][SS] State Data Source - Reader [spark]
HeartSaVioR commented on PR #43425: URL: https://github.com/apache/spark/pull/43425#issuecomment-1797817269 cc. @zsxwing @brkyvz @viirya @xuanyuanking Would you mind having a look? Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45804][UI] Add spark.ui.threadDump.flamegraphEnabled config to switch flame graph on/off [spark]
yaooqinn commented on PR #43674: URL: https://github.com/apache/spark/pull/43674#issuecomment-1797731478 Thank you, as always, @dongjoon-hyun and @HyukjinKwon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45812][BUILD][PYTHON][PS] Upgrade Pandas to 2.1.2 [spark]
dongjoon-hyun commented on PR #43689: URL: https://github.com/apache/spark/pull/43689#issuecomment-1797721159 Merged to master. Thank you, @itholic . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45812][BUILD][PYTHON][PS] Upgrade Pandas to 2.1.2 [spark]
dongjoon-hyun commented on PR #43689: URL: https://github.com/apache/spark/pull/43689#issuecomment-1797720993 All Python related tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45812][BUILD][PYTHON][PS] Upgrade Pandas to 2.1.2 [spark]
dongjoon-hyun closed pull request #43689: [SPARK-45812][BUILD][PYTHON][PS] Upgrade Pandas to 2.1.2 URL: https://github.com/apache/spark/pull/43689 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak [spark]
dongjoon-hyun commented on PR #43691: URL: https://github.com/apache/spark/pull/43691#issuecomment-1797718785 cc @sunchao , too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45804][UI] Add spark.ui.threadDump.flamegraphEnabled config to switch flame graph on/off [spark]
dongjoon-hyun commented on PR #43674: URL: https://github.com/apache/spark/pull/43674#issuecomment-1797718287 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45804][UI] Add spark.ui.threadDump.flamegraphEnabled config to switch flame graph on/off [spark]
dongjoon-hyun closed pull request #43674: [SPARK-45804][UI] Add spark.ui.threadDump.flamegraphEnabled config to switch flame graph on/off URL: https://github.com/apache/spark/pull/43674 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45814][CONNECT][SQL]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak [spark]
xieshuaihu commented on PR #43691: URL: https://github.com/apache/spark/pull/43691#issuecomment-1797666888 cc @HyukjinKwon @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45815][SQL][Streaming] Provide an interface for other Streaming sources to add `_metadata` columns [spark]
Yaohua628 opened a new pull request, #43692: URL: https://github.com/apache/spark/pull/43692 ### What changes were proposed in this pull request? Currently, only the native V1 file-based streaming source can read the `_metadata` column: https://github.com/apache/spark/blob/370870b7a0303e4a2c4b3dea1b479b4fcbc93f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala#L63 Our goal is to create an interface that allows other streaming sources to add `_metadata` columns. For instance, we would like the Delta Streaming source, which you can find here: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L49, to extend this interface and provide the `_metadata` column for its underlying storage format, such as Parquet. ### Why are the changes needed? A generic interface to enable other streaming sources to expose and add `_metadata` columns. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45686][INFRA][CORE][SQL][SS][CONNECT][MLLIB][DSTREAM][AVRO][ML][K8S][YARN][PYTHON][R][UI][GRAPHX][PROTOBUF][TESTS][EXAMPLES] Explicitly convert `Array` to `Seq` when function input is
LuciferYang commented on PR #43670: URL: https://github.com/apache/spark/pull/43670#issuecomment-1797423492 [bb5f3d4](https://github.com/apache/spark/pull/43670/commits/bb5f3d4f96c7315d98fc9c75cbad26890dfc) fix examples part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45639][SQL][PYTHON] Support loading Python data sources in DataFrameReader [spark]
cloud-fan commented on code in PR #43630: URL: https://github.com/apache/spark/pull/43630#discussion_r1384308595 ## sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala: ## @@ -208,10 +209,45 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { throw QueryCompilationErrors.pathOptionNotSetCorrectlyWhenReadingError() } -DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).flatMap { provider => - DataSourceV2Utils.loadV2Source(sparkSession, provider, userSpecifiedSchema, extraOptions, -source, paths: _*) -}.getOrElse(loadV1Source(paths: _*)) +val isUserDefinedDataSource = Review Comment: Unfortunately DS v2 `TableProvider` does not support USING yet. That's why the code is a bit messy here as it's not shared with the SQL USING path. We should support it though... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45813][CONNECT][PYTHON] Return the observed metrics from commands [spark]
beliefer commented on code in PR #43690: URL: https://github.com/apache/spark/pull/43690#discussion_r1384289821 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala: ## @@ -162,6 +162,18 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends s"${executeHolder.request.getPlan.getOpTypeCase} not supported.") } + if (executeHolder.observations.nonEmpty) { +val observedMetrics = executeHolder.observations.map { case (name, observation) => Review Comment: I'm confused here. It seems we already create and send observed metrics response at `SparkConnectPlanExecution.handlePlan`. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -1065,12 +1065,20 @@ class SparkConnectPlanner( numPartitionsOpt) } - private def transformCollectMetrics(rel: proto.CollectMetrics, planId: Long): LogicalPlan = { Review Comment: We don't need to semantically validate `CollectMetrics`? cc @allisonwang-db -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]
allisonwang-db commented on code in PR #43664: URL: https://github.com/apache/spark/pull/43664#discussion_r1384277531 ## python/pyspark/sql/connect/client/core.py: ## @@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str: return self._artifact_manager.cache_artifact(blob) raise SparkConnectException("Invalid state during retry exception handling.") +def _verify_response_integrity( +self, +response: Union[ +pb2.ConfigResponse, +pb2.ExecutePlanResponse, +pb2.InterruptResponse, +pb2.ReleaseExecuteResponse, +pb2.AddArtifactsResponse, +pb2.AnalyzePlanResponse, +pb2.FetchErrorDetailsResponse, +pb2.ReleaseSessionResponse, +], +) -> None: +""" +Verifies the integrity of the response. This method checks if the session ID and the +server-side session ID match. If not, it throws an exception. +Parameters +-- +response - One of the different response types handled by the Spark Connect service +""" +if self._session_id != response.session_id: +raise SparkConnectException( +"Received incorrect session identifier for request:" +f"{response.session_id} != {self._session_id}" +) Review Comment: Is this an invalid state that should not occur? If so, could we raise `PySparkAssertionError` instead? ## python/pyspark/sql/connect/client/core.py: ## @@ -1620,6 +1593,42 @@ def cache_artifact(self, blob: bytes) -> str: return self._artifact_manager.cache_artifact(blob) raise SparkConnectException("Invalid state during retry exception handling.") +def _verify_response_integrity( +self, +response: Union[ +pb2.ConfigResponse, +pb2.ExecutePlanResponse, +pb2.InterruptResponse, +pb2.ReleaseExecuteResponse, +pb2.AddArtifactsResponse, +pb2.AnalyzePlanResponse, +pb2.FetchErrorDetailsResponse, +pb2.ReleaseSessionResponse, +], +) -> None: +""" +Verifies the integrity of the response. This method checks if the session ID and the +server-side session ID match. If not, it throws an exception. +Parameters +-- +response - One of the different response types handled by the Spark Connect service +""" +if self._session_id != response.session_id: +raise SparkConnectException( +"Received incorrect session identifier for request:" +f"{response.session_id} != {self._session_id}" +) +if self._server_session_id is not None: +if response.server_side_session_id != self._server_session_id: +raise SparkConnectException( +"Received incorrect server side session identifier for request. " +"Please restart Spark Session. (" Review Comment: Can we be more explicit in this error message on how to restart a Spark session? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45808][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
allisonwang-db commented on code in PR #43667: URL: https://github.com/apache/spark/pull/43667#discussion_r1384270902 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala: ## @@ -46,9 +44,7 @@ class SparkConnectFetchErrorDetailsHandler( ErrorUtils.throwableToFetchErrorDetailsResponse( st = error, - serverStackTraceEnabled = sessionHolder.session.conf.get( -Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || sessionHolder.session.conf.get( Review Comment: Are we deprecating this config `Connect.CONNECT_SERVER_STACKTRACE_ENABLED`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45811][PYTHON][DOCS] Refine docstring of `from_xml` [spark]
allisonwang-db commented on code in PR #43680: URL: https://github.com/apache/spark/pull/43680#discussion_r1384266732 ## python/pyspark/sql/functions.py: ## @@ -13549,6 +13549,8 @@ def json_object_keys(col: "ColumnOrName") -> Column: return _invoke_function_over_columns("json_object_keys", col) +# TODO: Fix and add an example for StructType with Spark Connect Review Comment: this function doesn't support spark connect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45258][PYTHON][DOCS] Refine docstring of `sum` [spark]
allisonwang-db commented on code in PR #43684: URL: https://github.com/apache/spark/pull/43684#discussion_r1384265181 ## python/pyspark/sql/functions.py: ## @@ -1197,13 +1197,27 @@ def sum(col: "ColumnOrName") -> Column: Examples +Example 1: Calculating the sum of values in a column + +>>> from pyspark.sql import functions as sf >>> df = spark.range(10) ->>> df.select(sum(df["id"])).show() +>>> df.select(sf.sum(df["id"])).show() +---+ |sum(id)| +---+ | 45| +---+ + +Example 2: Using a plus expression together to calculate the sum + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(1, 2), (3, 4)], ["A", "B"]) +>>> df.select(sf.sum(sf.col("A") + sf.col("B"))).show() +++ +|sum((A + B))| +++ +| 10| +++ Review Comment: It would be great to include an example with groupby. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45259][PYTHON][DOCS] Refine docstring of `count` [spark]
allisonwang-db commented on code in PR #43685: URL: https://github.com/apache/spark/pull/43685#discussion_r1384263186 ## python/pyspark/sql/functions.py: ## @@ -1162,15 +1162,48 @@ def count(col: "ColumnOrName") -> Column: Examples -Count by all columns (start), and by a column that does not count ``None``. +Example 1: Count all rows in a DataFrame +>>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([(None,), ("a",), ("b",), ("c",)], schema=["alphabets"]) ->>> df.select(count(expr("*")), count(df.alphabets)).show() -+++ -|count(1)|count(alphabets)| -+++ -| 4| 3| -+++ +>>> df.select(sf.count(sf.expr("*"))).show() +++ +|count(1)| +++ +| 4| +++ + +Example 2: Count non-null values in a specific column + +>>> from pyspark.sql import functions as sf +>>> df.select(sf.count(df.alphabets)).show() +++ +|count(alphabets)| +++ +| 3| +++ + +Example 3: Count all rows in a DataFrame with multiple columns + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame( +... [(1, "apple"), (2, "banana"), (3, None)], schema=["id", "fruit"]) +>>> df.select(sf.count(sf.expr("*"))).show() +++ +|count(1)| +++ +| 3| +++ + +Example 4: Count non-null values in multiple columns + +>>> from pyspark.sql import functions as sf +>>> df.select(sf.count(df.id), sf.count(df.fruit)).show() ++-++ +|count(id)|count(fruit)| ++-++ +|3| 2| ++-++ Review Comment: It would be great to include some examples for groupby count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45814][CONNECT][CORE]Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak [spark]
xieshuaihu opened a new pull request, #43691: URL: https://github.com/apache/spark/pull/43691 ### What changes were proposed in this pull request? Make ArrowConverters.createEmptyArrowBatch call hasNext to avoid memory leak. ### Why are the changes needed? `ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if `TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` is leaked. In spark connect, `createEmptyArrowBatch` is called in [SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558) and [SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224), which cause a long running driver consume all off-heap memory specified by `-XX:MaxDirectMemorySize`. This is the exception stack: ``` org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer. at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67) at org.apache.arrow.memory.NettyAllocationManager.(NettyAllocationManager.java:77) at org.apache.arrow.memory.NettyAllocationManager.(NettyAllocationManager.java:84) at org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34) at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354) at org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349) at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337) at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315) at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279) at org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192) at org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338) at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308) at org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273) at org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44) at scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100) at scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87) at scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103) at org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.(ArrowConverters.scala:93) at org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.(ArrowConverters.scala:138) at org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.(ArrowConverters.scala:231) at org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:176) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:178) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:175) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:188) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scal
Re: [PR] [SPARK-45260][PYTHON][DOCS] Refine docstring of `count_distinct` [spark]
allisonwang-db commented on code in PR #43686: URL: https://github.com/apache/spark/pull/43686#discussion_r1384260727 ## python/pyspark/sql/functions.py: ## @@ -4626,26 +4626,38 @@ def count_distinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column: Examples ->>> from pyspark.sql import types ->>> df1 = spark.createDataFrame([1, 1, 3], types.IntegerType()) ->>> df2 = spark.createDataFrame([1, 2], types.IntegerType()) ->>> df1.join(df2).show() -+-+-+ -|value|value| -+-+-+ -|1|1| -|1|2| -|1|1| -|1|2| -|3|1| -|3|2| -+-+-+ ->>> df1.join(df2).select(count_distinct(df1.value, df2.value)).show() -++ -|count(DISTINCT value, value)| -++ -| 4| -++ +Example 1: Counting distinct values of a single column + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(1,), (1,), (3,)], ["value"]) +>>> df.select(sf.count_distinct(df.value)).show() ++-+ +|count(DISTINCT value)| ++-+ +|2| ++-+ + +Example 2: Counting distinct values of multiple columns + +>>> from pyspark.sql import functions as sf +>>> df1 = spark.createDataFrame([(1, 1), (1, 2)], ["value1", "value2"]) +>>> df1.select(sf.count_distinct(df1.value1, df1.value2)).show() Review Comment: ```suggestion >>> df = spark.createDataFrame([(1, 1), (1, 2)], ["value1", "value2"]) >>> df.select(sf.count_distinct(df.value1, df.value2)).show() ``` ## python/pyspark/sql/functions.py: ## @@ -4626,26 +4626,38 @@ def count_distinct(col: "ColumnOrName", *cols: "ColumnOrName") -> Column: Examples ->>> from pyspark.sql import types ->>> df1 = spark.createDataFrame([1, 1, 3], types.IntegerType()) ->>> df2 = spark.createDataFrame([1, 2], types.IntegerType()) ->>> df1.join(df2).show() -+-+-+ -|value|value| -+-+-+ -|1|1| -|1|2| -|1|1| -|1|2| -|3|1| -|3|2| -+-+-+ ->>> df1.join(df2).select(count_distinct(df1.value, df2.value)).show() -++ -|count(DISTINCT value, value)| -++ -| 4| -++ +Example 1: Counting distinct values of a single column + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([(1,), (1,), (3,)], ["value"]) +>>> df.select(sf.count_distinct(df.value)).show() ++-+ +|count(DISTINCT value)| ++-+ +|2| ++-+ + +Example 2: Counting distinct values of multiple columns + +>>> from pyspark.sql import functions as sf +>>> df1 = spark.createDataFrame([(1, 1), (1, 2)], ["value1", "value2"]) +>>> df1.select(sf.count_distinct(df1.value1, df1.value2)).show() ++--+ +|count(DISTINCT value1, value2)| ++--+ +| 2| ++--+ + +Example 3: Counting distinct values with column names as strings + +>>> from pyspark.sql import functions as sf +>>> df3 = spark.createDataFrame([(1, 1), (1, 2)], ["value1", "value2"]) +>>> df3.select(sf.count_distinct("value1", "value2")).show() Review Comment: ```suggestion >>> df = spark.createDataFrame([(1, 1), (1, 2)], ["value1", "value2"]) >>> df.select(sf.count_distinct("value1", "value2")).show() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45804][UI] Add spark.ui.threadDump.flamegraphEnabled config to switch flame graph on/off [spark]
yaooqinn commented on code in PR #43674: URL: https://github.com/apache/spark/pull/43674#discussion_r1384260744 ## core/src/main/scala/org/apache/spark/internal/config/UI.scala: ## @@ -97,6 +97,12 @@ private[spark] object UI { .booleanConf .createWithDefault(true) + val UI_FLAMEGRAPH_ENABLED = ConfigBuilder("spark.ui.threadDump.flamegraphEnabled") +.doc("Whether to render the Flamegraph for executor thread dumps") Review Comment: Do you mean that we should add a short paragraph to describe the flame graph gone because users turned it off? I considered this, but it seems that we don't do such things for other UI elements or switches, i.e., timeline, heapdump. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
allisonwang-db commented on code in PR #43682: URL: https://github.com/apache/spark/pull/43682#discussion_r1384255664 ## python/pyspark/worker.py: ## @@ -1057,6 +1059,9 @@ def mapper(_, it): yield from eval(*[a[o] for o in args_kwargs_offsets]) if terminate is not None: yield from terminate() +except StopIteration: +if terminate is not None: +yield from terminate() Review Comment: Ah the exception thrown in eval will be caught by this ``` except Exception as e: raise PySparkRuntimeError( error_class="UDTF_EXEC_ERROR", ``` I wonder if we should just move the whole try ... except ... block to this mapper function instead of checking this for every single input row. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45796][SQL] Support MODE() WITHIN GROUP (ORDER BY col) [spark]
beliefer commented on PR #43663: URL: https://github.com/apache/spark/pull/43663#issuecomment-1797146203 > We can write order by in windowNameOrSpecification, why do we need ORDER BY sortSpecification after WITHIN GROUP The scope of `ORDER BY sortSpecification` is different from order by in `windowNameOrSpecification`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
allisonwang-db commented on code in PR #43687: URL: https://github.com/apache/spark/pull/43687#discussion_r1384245409 ## python/pyspark/sql/readwriter.py: ## @@ -380,22 +380,72 @@ def json( Examples -Write a DataFrame into a JSON file and read it back. +Example 1: Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( -... [{"age": 100, "name": "Hyukjin Kwon"}] +... [{"age": 100, "name": "Hyukjin"}] ... ).write.mode("overwrite").format("json").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() -+---++ -|age|name| -+---++ -|100|Hyukjin Kwon| -+---++ ++---+---+ +|age| name| ++---+---+ +|100|Hyukjin| ++---+---+ + +Example 2: Read JSON from multiple files in a directory + +>>> import tempfile +>>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: +... # Write a DataFrame into a JSON file +... spark.createDataFrame( +... [{"age": 30, "name": "Bob"}] +... ).write.mode("overwrite").format("json").save(d1) +... +... # Read the JSON files as a DataFrame. +... spark.createDataFrame( +... [{"age": 25, "name": "Alice"}] +... ).write.mode("overwrite").format("json").save(d2) +... spark.read.json([d1, d2]).show() ++---+-+ +|age| name| ++---+-+ +| 25|Alice| +| 30| Bob| ++---+-+ + +Example 3: Read JSON from an RDD of JSON strings + +>>> json_strings = ["{'name': 'Alice', 'age': 25}", "{'name': 'Bob', 'age': 30}"] +>>> rdd = spark.sparkContext.parallelize(json_strings) # doctest: +SKIP +>>> df = spark.read.json(rdd) # doctest: +SKIP +>>> df.show() # doctest: +SKIP Review Comment: Yea my concern is that if users are using Spark Connect, this example won't work for them... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
allisonwang-db commented on code in PR #43682: URL: https://github.com/apache/spark/pull/43682#discussion_r1384244315 ## python/pyspark/worker.py: ## @@ -1057,6 +1059,9 @@ def mapper(_, it): yield from eval(*[a[o] for o in args_kwargs_offsets]) if terminate is not None: yield from terminate() +except StopIteration: +if terminate is not None: +yield from terminate() Review Comment: If we are catching the `StopIteration` exception in mapper, do we still need the try ... catch block inside the `def func` and `def evaluate` below? ## python/pyspark/worker.py: ## @@ -995,6 +995,8 @@ def verify_result(result): def func(*a: Any) -> Any: try: return f(*a) +except StopIteration: +raise Review Comment: Make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45813][CONNECT][PYTHON] Return the observed metrics from commands [spark]
ueshin commented on PR #43690: URL: https://github.com/apache/spark/pull/43690#issuecomment-1797143420 cc @HyukjinKwon @zhengruifeng @beliefer -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45813][CONNECT][PYTHON] Return the observed metrics from commands [spark]
ueshin opened a new pull request, #43690: URL: https://github.com/apache/spark/pull/43690 ### What changes were proposed in this pull request? Returns the observed metrics from commands. ### Why are the changes needed? Currently the observed metrics on commands are not available. For example: ```py >>> df = spark.range(10) >>> >>> observation = Observation() >>> observed_df = df.observe(observation, count(lit(1)).alias("cnt")) >>> >>> observed_df.show() ... >>> observation.get {} ``` it should be: ```py >>> observation.get {'cnt': 10} ``` ### Does this PR introduce _any_ user-facing change? Yes, the observed metrics on commands will be available. ### How was this patch tested? Added the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery [spark]
ulysses-you commented on PR #41088: URL: https://github.com/apache/spark/pull/41088#issuecomment-1797129478 @epa095 yes, updated that jira status -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45708][BUILD] Retry mvn deploy [spark]
LuciferYang commented on PR #43559: URL: https://github.com/apache/spark/pull/43559#issuecomment-1797103474 Will adding the `- Dmaven.resolver.transport=wagon` in `MAVEN_OPTS` have any effect? The default implementation of Maven 3.9 pieces of resolver has changed from wagon to httpclient -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
HyukjinKwon commented on code in PR #43687: URL: https://github.com/apache/spark/pull/43687#discussion_r1384213317 ## python/pyspark/sql/readwriter.py: ## @@ -380,22 +380,72 @@ def json( Examples -Write a DataFrame into a JSON file and read it back. +Example 1: Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( -... [{"age": 100, "name": "Hyukjin Kwon"}] +... [{"age": 100, "name": "Hyukjin"}] ... ).write.mode("overwrite").format("json").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() -+---++ -|age|name| -+---++ -|100|Hyukjin Kwon| -+---++ ++---+---+ +|age| name| ++---+---+ +|100|Hyukjin| ++---+---+ + +Example 2: Read JSON from multiple files in a directory + +>>> import tempfile +>>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: +... # Write a DataFrame into a JSON file +... spark.createDataFrame( +... [{"age": 30, "name": "Bob"}] +... ).write.mode("overwrite").format("json").save(d1) +... +... # Read the JSON files as a DataFrame. +... spark.createDataFrame( +... [{"age": 25, "name": "Alice"}] +... ).write.mode("overwrite").format("json").save(d2) +... spark.read.json([d1, d2]).show() ++---+-+ +|age| name| ++---+-+ +| 25|Alice| +| 30| Bob| ++---+-+ + +Example 3: Read JSON from an RDD of JSON strings + +>>> json_strings = ["{'name': 'Alice', 'age': 25}", "{'name': 'Bob', 'age': 30}"] +>>> rdd = spark.sparkContext.parallelize(json_strings) # doctest: +SKIP +>>> df = spark.read.json(rdd) # doctest: +SKIP +>>> df.show() # doctest: +SKIP Review Comment: Actually, Spark Connect doesn't have a way to convert DataFrame or RDD. So there's no way of doing similar stuff with Spark Connect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
dtenedor commented on code in PR #43682: URL: https://github.com/apache/spark/pull/43682#discussion_r1384198085 ## python/pyspark/worker.py: ## @@ -995,6 +995,8 @@ def verify_result(result): def func(*a: Any) -> Any: try: return f(*a) +except StopIteration: +raise Review Comment: I looked at this, but unlike scalar pyspark UDFs, we want to apply custom treatment for UDTFs with respect to the `eval` vs. `terminate` vs. `cleanup` methods. The idea is that if `eval` invokes `raise StopIteration()`, we then call `terminate` as normal and return a successful result for the UDTF as a whole. To implement this behavior, if we used `fail_on_stopiteration` here, we'd have to later catch a `PySparkRuntimeException` and check if it's `error_class="STOP_ITERATION_OCCURRED"` manually, which seems pretty confusing. It seems simpler to just `raise` here and catch the `StopIteration` on L1062 and L1154 instead, and then call `terminate`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43242] Fix throw 'Unexpected type of BlockId' in shuffle corruption diagnose [spark]
github-actions[bot] closed pull request #40921: [SPARK-43242] Fix throw 'Unexpected type of BlockId' in shuffle corruption diagnose URL: https://github.com/apache/spark/pull/40921 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
allisonwang-db commented on code in PR #43687: URL: https://github.com/apache/spark/pull/43687#discussion_r1384164698 ## python/pyspark/sql/readwriter.py: ## @@ -380,22 +380,72 @@ def json( Examples -Write a DataFrame into a JSON file and read it back. +Example 1: Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( -... [{"age": 100, "name": "Hyukjin Kwon"}] +... [{"age": 100, "name": "Hyukjin"}] ... ).write.mode("overwrite").format("json").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() -+---++ -|age|name| -+---++ -|100|Hyukjin Kwon| -+---++ ++---+---+ +|age| name| ++---+---+ +|100|Hyukjin| ++---+---+ + +Example 2: Read JSON from multiple files in a directory + +>>> import tempfile +>>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: +... # Write a DataFrame into a JSON file +... spark.createDataFrame( +... [{"age": 30, "name": "Bob"}] +... ).write.mode("overwrite").format("json").save(d1) +... +... # Read the JSON files as a DataFrame. +... spark.createDataFrame( +... [{"age": 25, "name": "Alice"}] +... ).write.mode("overwrite").format("json").save(d2) +... spark.read.json([d1, d2]).show() ++---+-+ +|age| name| ++---+-+ +| 25|Alice| +| 30| Bob| ++---+-+ + +Example 3: Read JSON from an RDD of JSON strings + +>>> json_strings = ["{'name': 'Alice', 'age': 25}", "{'name': 'Bob', 'age': 30}"] +>>> rdd = spark.sparkContext.parallelize(json_strings) # doctest: +SKIP +>>> df = spark.read.json(rdd) # doctest: +SKIP +>>> df.show() # doctest: +SKIP Review Comment: Shall we remove or change this RDD example to something that can work with Spark Connect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
allisonwang-db commented on code in PR #43682: URL: https://github.com/apache/spark/pull/43682#discussion_r1384161755 ## python/pyspark/worker.py: ## @@ -995,6 +995,8 @@ def verify_result(result): def func(*a: Any) -> Any: try: return f(*a) +except StopIteration: +raise Review Comment: Should we use something similar to `fail_on_stopiteration`? ``` except StopIteration as exc: raise PySparkRuntimeError( error_class="STOP_ITERATION_OCCURRED", message_parameters={ "exc": str(exc), }, ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1796931246 @tgravescs @mridulm @beliefer I made a small tweak where the `executorEnvs` map in the `SparkContext` is populated with the configuration prefix `spark.executorEnv.*` after the driver plugin is instantiated (see the last two commits). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45812][BUILD][PYTHON][PS] Upgrade Pandas to 2.1.2 [spark]
itholic opened a new pull request, #43689: URL: https://github.com/apache/spark/pull/43689 ### What changes were proposed in this pull request? This PR proposes to upgrade Pandas to 2.1.2. See https://pandas.pydata.org/docs/dev/whatsnew/v2.1.2.html for detail ### Why are the changes needed? Pandas 2.1.2 is released, and we should support the latest Pandas. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing CI should pass ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
tgravescs commented on code in PR #43494: URL: https://github.com/apache/spark/pull/43494#discussion_r1384004018 ## core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala: ## @@ -191,7 +191,10 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) -taskResources.put(taskDesc.taskId, taskDesc.resources) +// Convert resources amounts into ResourceInformation +val resources = taskDesc.resources.map { case (rName, addressesAmounts) => + rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)} +taskResources.put(taskDesc.taskId, resources) Review Comment: I don't think taskResources is needed at all anymore. Lets remove it unless you see it being used for something I'm missing. It was used in the statusUpdate call below that you removed. I actually think it wasn't needed even before (changed in Spark 3.4) that since the taskDescription and runningTasks has the same information and is now accessible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45223][PYTHON][DOCS] Refine docstring of `Column.when` [spark]
HyukjinKwon commented on PR #43688: URL: https://github.com/apache/spark/pull/43688#issuecomment-1796478652 This is last from me today :-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45223][PYTHON][DOCS] Refine docstring of `Column.when` [spark]
HyukjinKwon opened a new pull request, #43688: URL: https://github.com/apache/spark/pull/43688 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `Column.when`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
HyukjinKwon commented on PR #43687: URL: https://github.com/apache/spark/pull/43687#issuecomment-1796414232 Thank you @dongjoon-hyun !!! @allisonwang-db BTW do you plan to do this for all other functions, or some frequently used only? With my PRs, (almost) all under SPARK-44728 are resolved. Let's get the rest done, or file JIRAs for all (if that's your intention) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45803][CORE] Remove the no longer used `RpcAbortException` [spark]
dongjoon-hyun closed pull request #43673: [SPARK-45803][CORE] Remove the no longer used `RpcAbortException` URL: https://github.com/apache/spark/pull/43673 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
tgravescs commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1796408368 I agree that ideally we would finish SPARK-25299, I don't see that happening anytime soon. I also don't think it covers the case of people replacing the entire ShuffleManager vs just the storage piece. ShuffleManager API isn't public either but we have multiple implementations doing that now (Ubers RSS, project Gluten, Spark Rapids, I thought Cosco was although its not open source, etc). One note is that issue SPARK-25299 had a sub issue that was going to use the SparkPlugin for configuration https://issues.apache.org/jira/browse/SPARK-30033/https://github.com/apache/spark/pull/26670 and had a pr that mentions the weird interaction with initialization and it works around it in a different way. Overall while there are a bunch of changes here most of it is just moving initialization stuff around that shouldn't impact anything else. The one thing that is user impacting is the SparkEnv api change, which if we only do with 4.0 shouldn't be a big deal, unless there is some usage I'm not aware of. @mridulm Is there a specific part you are concerned with? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
HyukjinKwon commented on code in PR #43687: URL: https://github.com/apache/spark/pull/43687#discussion_r1383959348 ## python/pyspark/sql/readwriter.py: ## @@ -380,22 +380,72 @@ def json( Examples -Write a DataFrame into a JSON file and read it back. +Example 1: Write a DataFrame into a JSON file and read it back. >>> import tempfile >>> with tempfile.TemporaryDirectory() as d: ... # Write a DataFrame into a JSON file ... spark.createDataFrame( -... [{"age": 100, "name": "Hyukjin Kwon"}] +... [{"age": 100, "name": "Hyukjin"}] ... ).write.mode("overwrite").format("json").save(d) ... ... # Read the JSON file as a DataFrame. ... spark.read.json(d).show() -+---++ -|age|name| -+---++ -|100|Hyukjin Kwon| -+---++ ++---+---+ +|age| name| ++---+---+ +|100|Hyukjin| ++---+---+ + +Example 2: Read JSON from multiple files in a directory + +>>> import tempfile +>>> with tempfile.TemporaryDirectory() as d1, tempfile.TemporaryDirectory() as d2: +... # Write a DataFrame into a JSON file +... spark.createDataFrame( +... [{"age": 30, "name": "Bob"}] +... ).write.mode("overwrite").format("json").save(d1) +... +... # Read the JSON files as a DataFrame. +... spark.createDataFrame( +... [{"age": 25, "name": "Alice"}] +... ).write.mode("overwrite").format("json").save(d2) +... spark.read.json([d1, d2]).show() ++---+-+ +|age| name| ++---+-+ +| 25|Alice| +| 30| Bob| ++---+-+ + +Example 3: Read JSON from an RDD of JSON strings + +>>> json_strings = ["{'name': 'Alice', 'age': 25}", "{'name': 'Bob', 'age': 30}"] +>>> rdd = spark.sparkContext.parallelize(json_strings) # doctest: +SKIP +>>> df = spark.read.json(rdd) # doctest: +SKIP +>>> df.show() # doctest: +SKIP Review Comment: Skipped because it doesn't work with Spark Connect, and the result might be indeterministic itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45222][PYTHON][DOCS] Refine docstring of `DataFrameReader.json` [spark]
HyukjinKwon opened a new pull request, #43687: URL: https://github.com/apache/spark/pull/43687 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `DataFrameReader.json`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45260][PYTHON][DOCS] Refine docstring of `count_distinct` [spark]
HyukjinKwon opened a new pull request, #43686: URL: https://github.com/apache/spark/pull/43686 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `count_distinct`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45805][SQL] Make `withOrigin` more generic [spark]
peter-toth commented on PR #43671: URL: https://github.com/apache/spark/pull/43671#issuecomment-1796379533 Merged to `master` (4.0), thanks @MaxGekk for the fix and @HyukjinKwon for the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45805][SQL] Make `withOrigin` more generic [spark]
peter-toth commented on PR #43671: URL: https://github.com/apache/spark/pull/43671#issuecomment-1796375625 @HyukjinKwon, yes, those failures seem unrelated. I'm happy to merge it, but I've tested my permissions already... ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45259][PYTHON][DOCS] Refine docstring of `count` [spark]
HyukjinKwon opened a new pull request, #43685: URL: https://github.com/apache/spark/pull/43685 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `count`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45805][SQL] Make `withOrigin` more generic [spark]
peter-toth closed pull request #43671: [SPARK-45805][SQL] Make `withOrigin` more generic URL: https://github.com/apache/spark/pull/43671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45709][BUILD] Deploy packages when all packages are built [spark]
EnricoMi commented on PR #43561: URL: https://github.com/apache/spark/pull/43561#issuecomment-1796362720 @LuciferYang @HyukjinKwon the publish snapshot workflow keeps failing due to HTTP errors, which still causes inconsistent snapshot packages: https://github.com/apache/spark/actions/workflows/publish_snapshot.yml Please consider this fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45708][BUILD] Retry mvn deploy [spark]
EnricoMi commented on PR #43559: URL: https://github.com/apache/spark/pull/43559#issuecomment-1796354605 @LuciferYang @HyukjinKwon the publish snapshot workflow keeps failing due to HTTP errors: https://github.com/apache/spark/actions/workflows/publish_snapshot.yml Please consider this fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-45770][SQL][PYTHON][CONNECT] Introduce logical plan `UnresolvedDropColumns` for `Dataframe.drop` [spark]
zhengruifeng commented on code in PR #43683: URL: https://github.com/apache/spark/pull/43683#discussion_r1383923949 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -235,6 +235,23 @@ object Project { } } +case class UnresolvedDropColumns(dropList: Seq[Expression], child: LogicalPlan) Review Comment: name it `UnresolvedDropColumns` to avoid conflict with `DropColumns` in `v2AlterTableCommands` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45258][PYTHON][DOCS] Refine docstring of `sum` [spark]
HyukjinKwon opened a new pull request, #43684: URL: https://github.com/apache/spark/pull/43684 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `sum`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SPARK-45770][SQL][PYTHON][CONNECT] Introduce logical plan `UnresolvedDropColumns` for `Dataframe.drop` [spark]
zhengruifeng opened a new pull request, #43683: URL: https://github.com/apache/spark/pull/43683 ### What changes were proposed in this pull request? Fix column resolution in DataFrame.drop ### Why are the changes needed? ``` from pyspark.sql.functions import col # create first dataframe left_df = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['join_key', 'value1']) # create second dataframe right_df = spark.createDataFrame([(1, 'aa'), (2, 'bb'), (4, 'dd')], ['join_key', 'value2']) joined_df = left_df.join(right_df, on=left_df['join_key'] == right_df['join_key'], how='left') display(joined_df) cleaned_df = joined_df.drop(left_df['join_key']) display(cleaned_df) # error here JVM stacktrace: org.apache.spark.sql.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `join_key` is ambiguous, could be: [`join_key`, `join_key`]. SQLSTATE: 42704 at org.apache.spark.sql.errors.QueryCompilationErrors$.ambiguousReferenceError(QueryCompilationErrors.scala:1957) at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:377) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:156) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:167) at org.apache.spark.sql.Dataset.$anonfun$drop$4(Dataset.scala:3071) ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
dtenedor commented on PR #43682: URL: https://github.com/apache/spark/pull/43682#issuecomment-1796329902 cc @ueshin @allisonwang-db -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45810][Python] Create Python UDTF API to stop consuming rows from the input table [spark]
dtenedor opened a new pull request, #43682: URL: https://github.com/apache/spark/pull/43682 ### What changes were proposed in this pull request? This PR creates a Python UDTF API to stop consuming rows from the input table. If the UDTF raises a `StopIteration` exception in the `eval` method, then the UDTF stops consuming rows from the input table for that input partition, and finally calls the `terminate` method (if any). For example: ``` @udtf class TestUDTF: def __init__(self): self._total = 0 @staticmethod def analyze(_): return AnalyzeResult( schema=StructType().add("total", IntegerType()), withSinglePartition=True ) def eval(self, _: Row): self._total += 1 if self._total >= 3: raise StopIteration("StopIteration at self._total >= 3") def terminate(self): yield self._total, ``` ### Why are the changes needed? This is useful when the UDTF logic knows that we don't have to scan the input table anymore, and skip the rest of the I/O for that case. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds test coverage. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45186][PYTHON][DOCS] Refine docstring of `schema_of_xml` [spark]
HyukjinKwon opened a new pull request, #43681: URL: https://github.com/apache/spark/pull/43681 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `schema_of_xml`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45809][PYTHON][DOCS] Refine docstring of `from_xml` [spark]
HyukjinKwon opened a new pull request, #43680: URL: https://github.com/apache/spark/pull/43680 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `from_xml`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45808][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
grundprinzip commented on code in PR #43667: URL: https://github.com/apache/spark/pull/43667#discussion_r1383889882 ## python/pyspark/errors/exceptions/connect.py: ## @@ -16,7 +16,7 @@ # import pyspark.sql.connect.proto as pb2 import json -from typing import Dict, List, Optional, TYPE_CHECKING +from typing import Dict, List, Optional, TYPE_CHECKING, overload Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]
grundprinzip commented on code in PR #43664: URL: https://github.com/apache/spark/pull/43664#discussion_r1383886945 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala: ## @@ -201,7 +201,9 @@ private[connect] class SparkConnectAnalyzeHandler( case other => throw InvalidPlanInput(s"Unknown Analyze Method $other!") } -builder.setSessionId(request.getSessionId) +builder + .setSessionId(request.getSessionId) + .setServerSideSessionId(session.sessionUUID) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-45809][PYTHON][DOCS] Refine docstring of `lit` [spark]
HyukjinKwon opened a new pull request, #43679: URL: https://github.com/apache/spark/pull/43679 ### What changes were proposed in this pull request? This PR proposes to improve the docstring of `lit`. ### Why are the changes needed? For end users, and better usability of PySpark. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the user facing documentation. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]
grundprinzip commented on code in PR #43664: URL: https://github.com/apache/spark/pull/43664#discussion_r1383865603 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -179,6 +184,9 @@ class ArtifactManager( val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { private val summaries = mutable.Buffer.empty[ArtifactSummary] override def onNext(v: AddArtifactsResponse): Unit = { +if (v.getSessionId != sessionId) { + throw new IllegalStateException(s"Session ID mismatch: $sessionId != ${v.getSessionId}") +} Review Comment: Fixed this to call onError() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45791][CONNECT][TESTS] Rename `SparkConnectSessionHodlerSuite.scala` to `SparkConnectSessionHolderSuite.scala` [spark]
rangadi commented on PR #43657: URL: https://github.com/apache/spark/pull/43657#issuecomment-1796152154 Thanks for fixing this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45808][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
HyukjinKwon commented on code in PR #43667: URL: https://github.com/apache/spark/pull/43667#discussion_r1383863124 ## python/pyspark/errors/exceptions/connect.py: ## @@ -16,7 +16,7 @@ # import pyspark.sql.connect.proto as pb2 import json -from typing import Dict, List, Optional, TYPE_CHECKING +from typing import Dict, List, Optional, TYPE_CHECKING, overload Review Comment: ```suggestion from typing import Dict, List, Optional, TYPE_CHECKING ``` Linter complains :-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45798][CONNECT] Assert server-side session ID [spark]
grundprinzip commented on code in PR #43664: URL: https://github.com/apache/spark/pull/43664#discussion_r1383861338 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala: ## @@ -18,13 +18,93 @@ package org.apache.spark.sql.connect.client import scala.jdk.CollectionConverters._ +import com.google.protobuf.GeneratedMessageV3 import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver import org.apache.spark.connect.proto._ +// This is common logic shared between the blocking and non-blocking stubs. +// +// The common logic is responsible to verify the integrity of the response. The invariant is +// that the same stub instance is used for all requests from the same client. +private[client] class SparkConnectCommonStub { + // Server side session ID, used to detect if the server side session changed. This is set upon + // receiving the first response from the server. This value is used only for executions that + // do not use server-side streaming. + private var serverSideSessionId: Option[String] = None + + protected def verifyResponse[RespT <: GeneratedMessageV3](fn: => RespT): RespT = { +val response = fn +val field = response.getDescriptorForType.findFieldByName("server_side_session_id") +// If the field does not exist, we ignore this for now. Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44751][SQL] Move `XSDToSchema` from `catalyst` to `core` package [spark]
HyukjinKwon commented on PR #43652: URL: https://github.com/apache/spark/pull/43652#issuecomment-1796140022 tests passed at https://github.com/shujingyang-db/spark/actions/runs/6750039898 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44751][SQL] Move `XSDToSchema` from `catalyst` to `core` package [spark]
HyukjinKwon closed pull request #43652: [SPARK-44751][SQL] Move `XSDToSchema` from `catalyst` to `core` package URL: https://github.com/apache/spark/pull/43652 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [spark]
imback82 commented on code in PR #42577: URL: https://github.com/apache/spark/pull/42577#discussion_r1383858771 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ## @@ -3973,18 +4000,31 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { */ override def visitReplaceTable(ctx: ReplaceTableContext): LogicalPlan = withOrigin(ctx) { val orCreate = ctx.replaceTableHeader().CREATE() != null -val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) = - visitCreateTableClauses(ctx.createTableClauses()) +val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo, + clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) val columns = Option(ctx.createOrReplaceTableColTypeList()) .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil) + +if (clusterBySpec.isDefined) { + if (partCols.nonEmpty || partTransforms.nonEmpty) { +throw QueryParsingErrors.clusterByWithPartitionedBy(ctx) Review Comment: +1, done. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ## @@ -3958,6 +3983,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { * replace_table_clauses (order insensitive): * [OPTIONS table_property_list] * [PARTITIONED BY (partition_fields)] + * [CLUSTER BY (col_name, col_name, ...)] + * [CLUSTER BY (col_name, col_name, ...)] Review Comment: fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45808][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
grundprinzip commented on PR #43667: URL: https://github.com/apache/spark/pull/43667#issuecomment-1796129182 Filed SPARK-45808 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-XXX][CONNECT][PYTHON] Better error handling for SQL Exceptions [spark]
HyukjinKwon commented on PR #43667: URL: https://github.com/apache/spark/pull/43667#issuecomment-1796120836 Oh let's also file a JIRA btw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][INFRA] Correct Java version in RM Dockerfile description [spark]
HyukjinKwon closed pull request #43669: [MINOR][INFRA] Correct Java version in RM Dockerfile description URL: https://github.com/apache/spark/pull/43669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [spark]
imback82 commented on code in PR #42577: URL: https://github.com/apache/spark/pull/42577#discussion_r1383855097 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala: ## @@ -170,6 +170,23 @@ case class CatalogTablePartition( } } +/** + * A container for clustering information. + * + * @param columnNames the names of the columns used for clustering. + */ +case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) { + override def toString: String = columnNames.map(_.name).mkString(",") + + lazy val toDDL: String = if (columnNames.nonEmpty) s"CLUSTER BY ($toString)" else "" +} + +object ClusterBySpec { + def apply(columns: String): ClusterBySpec = columns match { Review Comment: this is used for parsing property value back to ClusterBySpec. I renamed this to `fromProperty`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][INFRA] Correct Java version in RM Dockerfile description [spark]
HyukjinKwon commented on PR #43669: URL: https://github.com/apache/spark/pull/43669#issuecomment-1796115085 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE [spark]
imback82 commented on code in PR #42577: URL: https://github.com/apache/spark/pull/42577#discussion_r1383854471 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala: ## @@ -253,7 +270,8 @@ case class CatalogTable( tracksPartitionsInCatalog: Boolean = false, schemaPreservesCase: Boolean = true, ignoredProperties: Map[String, String] = Map.empty, -viewOriginalText: Option[String] = None) { +viewOriginalText: Option[String] = None, +clusterBySpec: Option[ClusterBySpec] = None) { Review Comment: Done, I guess no need to make it as a reserved property like properties in `TableCatalog`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45805][SQL] Make `withOrigin` more generic [spark]
HyukjinKwon commented on PR #43671: URL: https://github.com/apache/spark/pull/43671#issuecomment-1796110374 test failure seems unrelated (https://github.com/MaxGekk/spark/actions/runs/6772663701/job/18414028124). @peter-toth wanna try merging a PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45786][SQL] Fix inaccurate Decimal multiplication and division results [spark]
HyukjinKwon commented on PR #43678: URL: https://github.com/apache/spark/pull/43678#issuecomment-1796094167 test: https://github.com/kazuyukitanimura/spark/actions/runs/6775292999/job/18414265284 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org