[GitHub] [spark] HyukjinKwon closed pull request #42989: [SPARK-45210][DOCS][3.4] Switch languages consistently across docs for all code snippets (Spark 3.4 and below)
HyukjinKwon closed pull request #42989: [SPARK-45210][DOCS][3.4] Switch languages consistently across docs for all code snippets (Spark 3.4 and below) URL: https://github.com/apache/spark/pull/42989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42989: [SPARK-45210][DOCS][3.4] Switch languages consistently across docs for all code snippets (Spark 3.4 and below)
HyukjinKwon commented on PR #42989: URL: https://github.com/apache/spark/pull/42989#issuecomment-1724870114 Merged to branch-3.4, branch-3.3, branch-3.2, and branch-3.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on PR #42917: URL: https://github.com/apache/spark/pull/42917#issuecomment-1724869552 > looks much better now, thanks for your patience! Thank you very much for patiently reviewing the code. ❤️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329599785 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -860,6 +860,50 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_PERMANENT_VIEW_NOT_TEMP" : { +"message" : [ + "'' expects a permanent view but is a temp view." +] + }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "'' expects a table but is a view." +], +"subClass" : { + "NO_ALTERNATIVE" : { +"message" : [ + "" +] + }, + "USE_ALTER_VIEW" : { +"message" : [ + "Please use ALTER VIEW instead." +] + } +} + }, + "EXPECT_TABLE_OR_PERMANENT_VIEW_NOT_TEMP" : { Review Comment: That seems quite reasonable. Let me give it a try, and at the same time, let me to double check the rationality of similar cases in UT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #42990: [SPARK-45212][INFRA] Install independent Python linter dependencies for branch-3.5
panbingkun commented on PR #42990: URL: https://github.com/apache/spark/pull/42990#issuecomment-1724855910 +1, LGTM. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rednaxelafx commented on pull request #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
rednaxelafx commented on PR #42988: URL: https://github.com/apache/spark/pull/42988#issuecomment-1724837849 One important aspect of flame graphs is the semantics of the "width" of the bars. It can be defined to mean anything, e.g. aggregated profiling ticks (i.e. number of samples) or wall clock duration etc. What's the intended semantics of "width" here for the thread dump snapshot? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #42990: [SPARK-45212][INFRA] Install independent Python linter dependencies for branch-3.5
zhengruifeng commented on PR #42990: URL: https://github.com/apache/spark/pull/42990#issuecomment-1724834827 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
rangadi commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329577300 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: Absolutely. It should fail the query. I thought this code was wrapping it in a SparkException and throwing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329574367 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: Not sure if catching any `NonFatal(ex)` is a good solution here since if errors occurs, it's highly possible that something went wrong in the python worker, and better to fail the query in this case. I'll leave it as what it is for now and also added a TODO in `StreamingForeachBatchHelper` to improve the handling of this scenario later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329574367 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: Not sure if catching any `NonFatal(ex)` is a good solution here since if errors occurs, it's highly possible that something went wrong in the python worker, and better to fail the query in this case. I'll leave it as what it is for now and also added a TODO in `StreamingForeachBatchHelper` to improve handling this scenario later. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala: ## @@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging { dataOut.writeLong(args.batchId) dataOut.flush() - val ret = dataIn.readInt() - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + try { +dataIn.readInt() match { + case ret if ret == 0 => +logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + case SpecialLengths.PYTHON_EXCEPTION_THROWN => +val exLength = dataIn.readInt() +val obj = new Array[Byte](exLength) +dataIn.readFully(obj) +val msg = new String(obj, StandardCharsets.UTF_8) +throw new PythonException(s"Found error inside foreachBatch Python process: $msg", null) Review Comment: Updated. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder sessionHolder.sessionId, "pyspark.sql.connect.streaming.worker.listener_worker") - val (dataOut, _) = runner.init() + val (dataOut, dataIn) = runner.init() override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(0) dataOut.flush() +handlePythonWorkerError("onQueryStarted") } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(1) dataOut.flush() +handlePythonWorkerError("onQueryProgress") } override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(2) dataOut.flush() +handlePythonWorkerError("onQueryIdle") } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(3) dataOut.flush() +handlePythonWorkerError("onQueryTerminated") } private[spark] def stopListenerProcess(): Unit = { runner.stop() } + + private def handlePythonWorkerError(functionName: String): Unit = { +try { + dataIn.readInt() match { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #42914: [SPARK-44910][SQL][3.4] Encoders.bean does not support superclasses with generic type arguments
dongjoon-hyun closed pull request #42914: [SPARK-44910][SQL][3.4] Encoders.bean does not support superclasses with generic type arguments URL: https://github.com/apache/spark/pull/42914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #42914: [SPARK-44910][SQL][3.4] Encoders.bean does not support superclasses with generic type arguments
dongjoon-hyun commented on PR #42914: URL: https://github.com/apache/spark/pull/42914#issuecomment-1724820449 Merged to branch-3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #42634: [SPARK-44910][SQL] Encoders.bean does not support superclasses with generic type arguments
dongjoon-hyun commented on PR #42634: URL: https://github.com/apache/spark/pull/42634#issuecomment-1724819818 Merged to master/3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #42634: [SPARK-44910][SQL] Encoders.bean does not support superclasses with generic type arguments
dongjoon-hyun closed pull request #42634: [SPARK-44910][SQL] Encoders.bean does not support superclasses with generic type arguments URL: https://github.com/apache/spark/pull/42634 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #42956: [SPARK-43654][CONNECT][PS][TESTS] Enable `InternalFrameParityTests.test_from_pandas`
dongjoon-hyun closed pull request #42956: [SPARK-43654][CONNECT][PS][TESTS] Enable `InternalFrameParityTests.test_from_pandas` URL: https://github.com/apache/spark/pull/42956 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
yaooqinn commented on PR #42988: URL: https://github.com/apache/spark/pull/42988#issuecomment-1724810773 This PR mainly focuses on the UI, independent of the profiling steps. What we might have in the future are: - Flame Graph Support For Task Thread Page, which [SPARK-45151](https://issues.apache.org/jira/browse/SPARK-45151) added - Add ProfilingExecutor(max, interval) message to profile the whole executor, which returns the same data structure with TriggerThreadDump message - Add ProfileTask(taskId, max, interval) message to profile a specific task, which returns the same data structure with TaskThreadDump message - Different views for on/off/full CPUs - Mixed mode profiling, which might rely upon some ext libs at runtime - And so on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang opened a new pull request, #42990: [SPARK-45212][INFRA] Install independent Python linter dependencies for branch-3.5
LuciferYang opened a new pull request, #42990: URL: https://github.com/apache/spark/pull/42990 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
mridulm commented on PR #42988: URL: https://github.com/apache/spark/pull/42988#issuecomment-1724800071 The UI looks nice ! Thanks for working on this @yaooqinn :-) My main concern is around effectively capturing stack frames without safepoint bias, correlating it to the specific task, and for executor flamegraph which threads to capture. @HyukjinKwon might remember what we had built for Safari, but we did that outside of the scope of Spark due to limitations (which might not be relevant now to be honest). This has been called out of scope - but details around what we planning to support (in the first cut) would be great to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
sunchao commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329548074 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: Yea looks like it's not easy to re-use `StaticInvoke`. Instead of stacking the extra parameters on top of the class though, I wonder if it makes sense to create another class such as `V2StaticInvoke` or something for this specific use case. Just a nit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] allisonwang-db commented on a diff in pull request #42949: [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query
allisonwang-db commented on code in PR #42949: URL: https://github.com/apache/spark/pull/42949#discussion_r1329547039 ## python/pyspark/sql/connect/client/artifact.py: ## @@ -243,11 +244,15 @@ def _create_requests( self, *path: str, pyfile: bool, archive: bool, file: bool ) -> Iterator[proto.AddArtifactsRequest]: """Separated for the testing purpose.""" -return self._add_artifacts( -chain( -*(self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) for p in path) +try: +yield from self._add_artifacts( +chain( +*(self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) for p in path) +) ) -) +except Exception as e: +logger.error(f"Failed to submit addArtifacts request: {e}") Review Comment: I think we should show this error message regardless of the `SPARK_CONNECT_LOG_LEVEL` setting to make it more debuggable. Can we raise a more user-friendly error below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #42981: [SPARK-45211][CONNECT] Eliminated ambiguous references in `CloseableIterator#apply` to fix Scala 2.13 daily test
LuciferYang commented on PR #42981: URL: https://github.com/apache/spark/pull/42981#issuecomment-1724793150 Thanks @juliuszsompolski -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ConeyLiu commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
ConeyLiu commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329541422 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: Maybe we can take a try with `TreeNodeTag` to save those two names. This can avoid breaking the `StaticInvoke` constructor. However, I am not sure it is a good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ConeyLiu commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
ConeyLiu commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329539929 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: Seems is not easy. The `BoundFunction` is created by calling the bind method of `UnboundFunction` which is loaded by `FunctionCatalog`. And we have no guarantee the `BoundFunction` has no-args constructor to create with reflect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
zhengruifeng commented on PR #42988: URL: https://github.com/apache/spark/pull/42988#issuecomment-1724783234 awesome! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
HyukjinKwon commented on PR #42988: URL: https://github.com/apache/spark/pull/42988#issuecomment-1724783071 Looks cool. cc @mridulm FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on pull request #42981: [SPARK-45211][CONNECT] Eliminated ambiguous references in `CloseableIterator#apply` to fix Scala 2.13 daily test
LuciferYang commented on PR #42981: URL: https://github.com/apache/spark/pull/42981#issuecomment-1724777489 connect module test success with Scala 2.12 with this pr: https://github.com/LuciferYang/spark/runs/16908220090 https://github.com/apache/spark/assets/1475305/ac2b686a-cd9a-4459-8f6a-03ba393e17b3;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
cloud-fan commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329529759 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: we can instantiate `staticObject` and call its `canonicalName` function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
cloud-fan commented on PR #42917: URL: https://github.com/apache/spark/pull/42917#issuecomment-1724772520 looks much better now, thanks for your patience! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #42969: [SPARK-45192][UI] Fix overdue lineInterpolate parameter for graphviz edge
yaooqinn commented on PR #42969: URL: https://github.com/apache/spark/pull/42969#issuecomment-1724771959 cc @sarutak @HyukjinKwon @dongjoon-hyun thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
cloud-fan commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329519381 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -860,6 +860,50 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_PERMANENT_VIEW_NOT_TEMP" : { +"message" : [ + "'' expects a permanent view but is a temp view." +] + }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "'' expects a table but is a view." +], +"subClass" : { + "NO_ALTERNATIVE" : { +"message" : [ + "" +] + }, + "USE_ALTER_VIEW" : { +"message" : [ + "Please use ALTER VIEW instead." +] + } +} + }, + "EXPECT_TABLE_OR_PERMANENT_VIEW_NOT_TEMP" : { Review Comment: Actually, seems we can reuse `EXPECT_PERMANENT_VIEW_NOT_TEMP`? If we get a temp view, it's probably good enough to notify users that the operation needs permanent view not temp, no need to mention that the operation accepts table as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42989: [SPARK-45210][DOCS][3.4] Switch languages consistently across docs for all code snippets (Spark 3.4 and below)
HyukjinKwon commented on PR #42989: URL: https://github.com/apache/spark/pull/42989#issuecomment-1724767346 cc @gengliangwang @panbingkun @sarutak @zhengruifeng FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request, #42989: [SPARK-45210][DOCS][3.4] Switch languages consistently across docs for all code snippets (Spark 3.4 and below)
HyukjinKwon opened a new pull request, #42989: URL: https://github.com/apache/spark/pull/42989 ### What changes were proposed in this pull request? This PR proposes to recover the availity of switching languages consistently across docs for all code snippets in Spark 3.4 and below by using the proper class selector in the JQuery. Previously the selector was a string `.nav-link tab_python` which did not comply multiple class selection: https://www.w3.org/TR/CSS21/selector.html#class-html. I assume it worked as a legacy behaviour somewhere. Now it uses the standard way `.nav-link.tab_python`. Note that https://github.com/apache/spark/pull/42657 works because there's only single class assigned (after we refactored the site at https://github.com/apache/spark/pull/40269) ### Why are the changes needed? This is a regression in our documentation site. ### Does this PR introduce _any_ user-facing change? Yes, once you click the language tab, it will apply to the examples in the whole page. ### How was this patch tested? Manually tested after building the site. ![Screenshot 2023-09-19 at 12 08 17 PM](https://github.com/apache/spark/assets/6477701/09d0c117-9774-4404-8e2e-d454b7f700a3) ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
cloud-fan commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329515855 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -860,6 +860,50 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_PERMANENT_VIEW_NOT_TEMP" : { +"message" : [ + "'' expects a permanent view but is a temp view." +] + }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "'' expects a table but is a view." +], +"subClass" : { + "NO_ALTERNATIVE" : { +"message" : [ + "" +] + }, + "USE_ALTER_VIEW" : { +"message" : [ + "Please use ALTER VIEW instead." +] + } +} + }, + "EXPECT_TABLE_OR_PERMANENT_VIEW_NOT_TEMP" : { Review Comment: ```suggestion "EXPECT_PERMANENT_TABLE_OR_VIEW_NOT_TEMP_VIEW" : { ``` ## common/utils/src/main/resources/error/error-classes.json: ## @@ -860,6 +860,50 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_PERMANENT_VIEW_NOT_TEMP" : { +"message" : [ + "'' expects a permanent view but is a temp view." +] + }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "'' expects a table but is a view." +], +"subClass" : { + "NO_ALTERNATIVE" : { +"message" : [ + "" +] + }, + "USE_ALTER_VIEW" : { +"message" : [ + "Please use ALTER VIEW instead." +] + } +} + }, + "EXPECT_TABLE_OR_PERMANENT_VIEW_NOT_TEMP" : { Review Comment: in case we add temp table in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #42982: [SPARK-45202][BUILD] Fix lint-js tool and js format
yaooqinn commented on PR #42982: URL: https://github.com/apache/spark/pull/42982#issuecomment-1724765818 The job containing lint-js passed. Thanks @sarutak @dongjoon-hyun @HyukjinKwon , merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn closed pull request #42982: [SPARK-45202][BUILD] Fix lint-js tool and js format
yaooqinn closed pull request #42982: [SPARK-45202][BUILD] Fix lint-js tool and js format URL: https://github.com/apache/spark/pull/42982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #42357: [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client
mridulm commented on code in PR #42357: URL: https://github.com/apache/spark/pull/42357#discussion_r1329512411 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: ## @@ -533,9 +536,12 @@ private[spark] class Client( // If preload is enabled, preload the statCache with the files in the directories val statCache = if (statCachePreloadEnabled) { // Consider only following configurations, as they involve the distribution of multiple files - val files = sparkConf.get(SPARK_JARS).orNull ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++ -sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ -sparkConf.get(PY_FILES) ++ pySparkArchives + var files = sparkConf.get(JARS_TO_DISTRIBUTE) ++ sparkConf.get(FILES_TO_DISTRIBUTE) ++ +sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ sparkConf.get(PY_FILES) ++ pySparkArchives + if (!sparkConf.get(SPARK_JARS).isEmpty) { Review Comment: Why this `if` condition ? we can directly added to `files` in previous line itself ? ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: ## @@ -494,11 +494,14 @@ private[spark] class Client( fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = { val statCache = HashMap[URI, FileStatus]() directoriesToBePreloaded(files).foreach { case (dir: URI, filesInDir: HashSet[String]) => - fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()). -filter(f => filesInDir.contains(f.getPath.getName)).foreach { fileStatus => - val uri = fileStatus.getPath.toUri + fsLookup(dir).listStatus(new Path(dir), new PathFilter() { +override def accept(path: Path): Boolean = filesInDir.contains(path.getName) + }).filter(_.isFile()).foreach { fileStatus => +val uri = fileStatus.getPath.toUri +if (uri != null) { Review Comment: IIRC `uri` cant be `null` - why was this condition added ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn opened a new pull request, #42988: [WIP][SPARK-45209][CORE][UI] Flame Graph Support For Executor Thread Dump Page
yaooqinn opened a new pull request, #42988: URL: https://github.com/apache/spark/pull/42988 ### What changes were proposed in this pull request? This PR draws a CPU Flame Graph by Java stack traces for executors and drivers. Currently, the Java stack traces is just a SNAPSHOT, not sampling at a certain frequency for a period. Sampling might be considered an upcoming feature out of the scope of this PR. ![fg git](https://github.com/apache/spark/assets/8326978/c3f99a1a-78ee-4adb-be1f-e4afd5f307b7) If you are new to flame graphs, there are also some references you can refer to learn about the basic concepts and details. [1] [Flame Graphs](https://www.brendangregg.com/flamegraphs.html) [2] [FLIP-165: Operator's Flame Graphs](https://cwiki.apache.org/confluence/display/FLINK/FLIP-165%3A+Operator%27s+Flame+Graphs) [3] [Java in Flames. mixed-mode flame graphs provide a… | by Netflix Technology Blog](https://netflixtechblog.com/java-in-flames-e763b3d32166) [4] [HProf](https://docs.oracle.com/javase/7/docs/technotes/samples/hprof.html) ### Why are the changes needed? Performance is always an important design factor in Spark. It is desirable to provide better visibility into the distribution of CPU resources while executing user code alongside the Spark kernel. One of the most visually effective means to do that is [Flame Graphs](http://www.brendangregg.com/FlameGraphs/cpuflamegraphs.html), which visually presents the data gathered by performance profiling tools used by developers for performance tuning their applications. ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? locally ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329502401 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -1142,7 +1142,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor throw QueryCompilationErrors.unsupportedViewOperationError(identifier, cmd, false, u) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329502973 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -1142,7 +1142,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor throw QueryCompilationErrors.unsupportedViewOperationError(identifier, cmd, false, u) Review Comment: I also added this new error class: `EXPECT_TABLE_OR_PERMANENT_VIEW_NOT_TEMP`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329502401 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -1142,7 +1142,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor throw QueryCompilationErrors.unsupportedViewOperationError(identifier, cmd, false, u) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ConeyLiu commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
ConeyLiu commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329489593 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: For V2 UDF, the `staticObject` here is the class of the `BoundFunction`, and the `functionName` is the magic name: `invoke`. However, we need the `name` and `canonicalName` to build the `UserDefinedScalarFunc`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #42982: [SPARK-45202][BUILD] Fix lint-js tool and js format
yaooqinn commented on code in PR #42982: URL: https://github.com/apache/spark/pull/42982#discussion_r1329460092 ## dev/lint-js: ## @@ -44,8 +44,14 @@ if ! npm ls eslint > /dev/null; then npm ci eslint fi -npx eslint -c "$SPARK_ROOT_DIR/dev/eslint.json" $LINT_TARGET_FILES | tee "$LINT_JS_REPORT_FILE_NAME" Review Comment: SGTM, Thanks for the suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on a diff in pull request #42982: [SPARK-45202][BUILD] Fix lint-js tool and js format
yaooqinn commented on code in PR #42982: URL: https://github.com/apache/spark/pull/42982#discussion_r1329459486 ## dev/lint-js: ## @@ -44,8 +44,14 @@ if ! npm ls eslint > /dev/null; then npm ci eslint fi -npx eslint -c "$SPARK_ROOT_DIR/dev/eslint.json" $LINT_TARGET_FILES | tee "$LINT_JS_REPORT_FILE_NAME" -lint_status=$? +declare lint_status=0 +for dir in "${LINT_TARGET_FILES[@]}"; do + npx eslint -c "$SPARK_ROOT_DIR/dev/eslint.json" "$dir" + status=$? + if [ $status -ne 0 ] ; then +lint_status=$status + fi +done Review Comment: ```suggestion npx eslint -c "$SPARK_ROOT_DIR/dev/eslint.json" ${LINT_TARGET_FILES[@]} | tee "$LINT_JS_REPORT_FILE_NAME" lint_status=$? ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] rangadi commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
rangadi commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329448797 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala: ## @@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging { dataOut.writeLong(args.batchId) dataOut.flush() - val ret = dataIn.readInt() - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + try { +dataIn.readInt() match { + case ret if ret == 0 => Review Comment: Minor: Can simply be `case 0 => ` ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder sessionHolder.sessionId, "pyspark.sql.connect.streaming.worker.listener_worker") - val (dataOut, _) = runner.init() + val (dataOut, dataIn) = runner.init() override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(0) dataOut.flush() +handlePythonWorkerError("onQueryStarted") } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(1) dataOut.flush() +handlePythonWorkerError("onQueryProgress") } override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(2) dataOut.flush() +handlePythonWorkerError("onQueryIdle") } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(3) dataOut.flush() +handlePythonWorkerError("onQueryTerminated") } private[spark] def stopListenerProcess(): Unit = { runner.stop() } + + private def handlePythonWorkerError(functionName: String): Unit = { +try { + dataIn.readInt() match { Review Comment: Can you add a comment to reuse this code? ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala: ## @@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging { dataOut.writeLong(args.batchId) dataOut.flush() - val ret = dataIn.readInt() - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + try { +dataIn.readInt() match { + case ret if ret == 0 => +logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + case SpecialLengths.PYTHON_EXCEPTION_THROWN => +val exLength = dataIn.readInt() +val obj = new Array[Byte](exLength) +dataIn.readFully(obj) +val msg = new String(obj, StandardCharsets.UTF_8) +throw new PythonException(s"Found error inside foreachBatch Python process: $msg", null) Review Comment: Handle other return here. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder sessionHolder.sessionId, "pyspark.sql.connect.streaming.worker.listener_worker") - val (dataOut, _) = runner.init() + val (dataOut, dataIn) = runner.init() override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(0) dataOut.flush() +handlePythonWorkerError("onQueryStarted") } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(1) dataOut.flush() +handlePythonWorkerError("onQueryProgress") } override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(2) dataOut.flush() +handlePythonWorkerError("onQueryIdle") } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { PythonRDD.writeUTF(event.json, dataOut) dataOut.writeInt(3) dataOut.flush() +handlePythonWorkerError("onQueryTerminated") } private[spark] def stopListenerProcess(): Unit = { runner.stop() } + + private def handlePythonWorkerError(functionName: String): Unit = { +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength =
[GitHub] [spark] ulysses-you commented on a diff in pull request #42967: [SPARK-45191][SQL] InMemoryTableScanExec simpleStringWithNodeId adds columnar info
ulysses-you commented on code in PR #42967: URL: https://github.com/apache/spark/pull/42967#discussion_r1329450429 ## sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala: ## @@ -264,8 +269,7 @@ case class CachedRDDBuilder( } private def buildBuffers(): RDD[CachedBatch] = { -val cb = if (cachedPlan.supportsColumnar && -serializer.supportsColumnarInput(cachedPlan.output)) { +val cb = if (supportsColumnarInput) { serializer.convertColumnarBatchToCachedBatch( Review Comment: The vanilla Spark default implementation is `throw new IllegalStateException("Columnar input is not supported")`. So only the people who use a custom cache serializer will be affected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
cloud-fan commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329447311 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: +1, we can check if `staticObject` is a subclass of `BoundFunction` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329447168 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: If we want to add retry logic, adding it here would not solve the problem. If the EOF is due to an IOException during `write_` functions, we might need to add retries inside the workers. But I'm not sure how high would the possibility be for that scenario, since if IOException happens while writing, it's highly possible that the socket is closed, and retry would no help in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329447168 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: If we want to add retry logic, adding it here would not solve the problem. If the EOF is due to an IOException during `write_` functions, we might need to add retries inside the workers. But I'm not sure how high would the possibility be for that scenario to happen, since if IOException happens while writing, it's highly possible that the socket is closed, and retry would no help in that case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] copperybean commented on pull request #42495: [SPARK-44812][SQL] Push all predicates according to EqualTo and EqualNullSafe
copperybean commented on PR #42495: URL: https://github.com/apache/spark/pull/42495#issuecomment-1724694181 @cloud-fan @wangyum Could you review this PR, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on pull request #42793: [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
itholic commented on PR #42793: URL: https://github.com/apache/spark/pull/42793#issuecomment-1724692463 Thanks all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
WweiL commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329441472 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala: ## @@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder } private def handlePythonWorkerError(functionName: String): Unit = { -dataIn.readInt() match { - case ret if ret == 0 => -logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") - case SpecialLengths.PYTHON_EXCEPTION_THROWN => -val exLength = dataIn.readInt() -val obj = new Array[Byte](exLength) -dataIn.readFully(obj) -val msg = new String(obj, StandardCharsets.UTF_8) -throw new IllegalStateException(s"Found error inside Streaming query listener Python " + - s"process for function $functionName: $msg") +try { + dataIn.readInt() match { +case ret if ret == 0 => + logInfo(s"Streaming query listener function $functionName completed (ret: $ret)") +case SpecialLengths.PYTHON_EXCEPTION_THROWN => + val exLength = dataIn.readInt() + val obj = new Array[Byte](exLength) + dataIn.readFully(obj) + val msg = new String(obj, StandardCharsets.UTF_8) + throw new PythonException(s"Found error inside Streaming query listener Python " + +s"process for function $functionName: $msg", null) + } +} catch { + case eof: EOFException => Review Comment: @HyukjinKwon @taku-k Guys, do you know if the EOFException would happen frequently in existing pyspark tests? I was wondering if this is possible: Say in common PythonRunner, EOF occurs, then the task would likely be failing but spark nicely handles the retry. But here if the socket has some error, we don't have any retry logic, and the stream query would just fail with our issue. Given that streamingPythonRunners are long-running processes, I was wondering if you also think having a retry-logic would be beneficial as a followup. cc @rangadi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on pull request #42895: [SPARK-45138][SS] Define a new error class and apply it when checkpointing state to DFS fails
HeartSaVioR commented on PR #42895: URL: https://github.com/apache/spark/pull/42895#issuecomment-1724683270 Maybe it's the first time you are contributing to Apache Spark? If then, congrats on your first contribution! https://spark.apache.org/contributing.html Please check the section `Pull request` to see the steps you need to do for enabling Github Action. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329427441 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala: ## @@ -52,11 +52,40 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { -case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) -case OneTimeTrigger => SingleBatchExecutor() -case AvailableNowTrigger => MultiBatchExecutor() -case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { +assert(sources.nonEmpty, "sources should have been retrieved from the plan!") +trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => +// See SPARK-45178 for more details. +if (sparkSession.sqlContext.conf.getConf( +SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { + logInfo("Configured to use the wrapper of Trigger.AvailableNow.") Review Comment: Yeah let's put queryID and runID at least to differentiate the query. Please note that this is an INFO log indicating that they turned on the flag (which is OK), and the WARN log will follow when they are falling back to use wrapper (which could be problematic). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on a diff in pull request #42917: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
panbingkun commented on code in PR #42917: URL: https://github.com/apache/spark/pull/42917#discussion_r1329425447 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -860,6 +860,35 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "The view does not support ." Review Comment: Okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329424458 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala: ## @@ -52,11 +52,40 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { -case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) -case OneTimeTrigger => SingleBatchExecutor() -case AvailableNowTrigger => MultiBatchExecutor() -case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { +assert(sources.nonEmpty, "sources should have been retrieved from the plan!") +trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => +// See SPARK-45178 for more details. +if (sparkSession.sqlContext.conf.getConf( Review Comment: The logic is correct, but I agree that it's very confusing. I'll add some comment to elaborate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329424143 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala: ## @@ -201,7 +207,15 @@ case class MemoryStream[A : Encoder]( override def initialOffset: OffsetV2 = LongOffset(-1) + override def prepareForTriggerAvailableNow(): Unit = synchronized { Review Comment: While I agree this is a side-effect, this heavily increases a test coverage as well. We test with three data sources in TriggerAvailableNowSuite, and none of three supports Trigger.AvailableNow, hence we are basically missing the case where data source supports Trigger.AvailableNow in that suite. We missed that for a long time, I'd like to handle this as well, as long as we are here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329422764 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -69,8 +73,32 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] while True: df_ref_id = utf8_deserializer.loads(infile) batch_id = read_long(infile) -process(df_ref_id, int(batch_id)) # TODO(SPARK-44463): Propagate error to the user. -write_int(0, outfile) +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. +try: +process(df_ref_id, int(batch_id)) +write_int(0, outfile) +except BaseException as e: Review Comment: done. ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala: ## @@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging { dataOut.writeLong(args.batchId) dataOut.flush() - val ret = dataIn.readInt() - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + try { +dataIn.readInt() match { + case ret if ret == 0 => +logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + case SpecialLengths.PYTHON_EXCEPTION_THROWN => +val exLength = dataIn.readInt() +val obj = new Array[Byte](exLength) +dataIn.readFully(obj) +val msg = new String(obj, StandardCharsets.UTF_8) +throw new IllegalStateException(s"Found error inside foreachBatch Python process: $msg") Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapp
anishshri-db commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329420413 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala: ## @@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming class AvailableNowDataStreamWrapper(val delegate: SparkDataStream) extends SparkDataStream with SupportsTriggerAvailableNow with Logging { + // See SPARK-45178 for more details. + logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " + +s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " + +s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " + +"with data source developer to support Trigger.AvailableNow.") Review Comment: ok sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #42971: [SPARK-43979][SQL][FOLLOWUP] Handle non alias-only project case
cloud-fan commented on code in PR #42971: URL: https://github.com/apache/spark/pull/42971#discussion_r1329419904 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala: ## @@ -1410,6 +1410,21 @@ class AnalysisSuite extends AnalysisTest with Matchers { assertAnalysisSuccess(finalPlan) } + test("Remove extra project as long as all attributes are semantically the same") { +val analyzer = getAnalyzer +val inner = Project( + Seq( +Alias(testRelation2.output(0), "a")(), +testRelation2.output(1), +Alias(testRelation2.output(2), "c")(), +testRelation2.output(3), +testRelation2.output(4), + ), + testRelation2) +val actual_plan = analyzer.simplifyPlanForCollectedMetrics(inner) Review Comment: note: now we only call `simplifyPlanForCollectedMetrics` with canonicalized plan. We should match it in the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329418792 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/AvailableNowDataStreamWrapper.scala: ## @@ -28,6 +28,12 @@ import org.apache.spark.sql.connector.read.streaming class AvailableNowDataStreamWrapper(val delegate: SparkDataStream) extends SparkDataStream with SupportsTriggerAvailableNow with Logging { + // See SPARK-45178 for more details. + logWarning(s"Activating the wrapper implementation of Trigger.AvailableNow for source " + +s"[$delegate]. Note that this might introduce possibility of deduplication, dataloss, " + +s"correctness issue. Enable the config with extreme care. We strongly recommend to contact " + +"with data source developer to support Trigger.AvailableNow.") Review Comment: maybe I'd remove Spark here - they need to contact 3rd party data source developer, not Spark community. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapp
anishshri-db commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329417880 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala: ## @@ -52,11 +52,40 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { -case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) -case OneTimeTrigger => SingleBatchExecutor() -case AvailableNowTrigger => MultiBatchExecutor() -case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { +assert(sources.nonEmpty, "sources should have been retrieved from the plan!") +trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => +// See SPARK-45178 for more details. +if (sparkSession.sqlContext.conf.getConf( Review Comment: Not sure Im reading this correctly. If the flag is enabled, we use multi-batch directly ? Shouldn't this be the case if the flag is disabled ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329417758 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2180,6 +2180,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED = +buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled") + .internal() + .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " + +"does not support Trigger.AvailableNow. Enabling this allows the benefits of " + +"Trigger.AvailableNow with sources which don't support it, but some sources " + +"may show unexpected behavior including duplication, data loss, etc. So use with " + Review Comment: I'm enumerating up the possibility - what we actually observed is a duplication, but suppose the data source which does not rely on offset management from Spark and tries to maintain that from source itself, then additional call might end up skipping some data to be provided. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #40642: [SPARK-43010][PYTHON] Migrate Column errors into error class
itholic commented on code in PR #40642: URL: https://github.com/apache/spark/pull/40642#discussion_r1329417329 ## python/pyspark/errors/error_classes.py: ## @@ -24,6 +24,21 @@ "Argument `` is required when ." ] }, + "CANNOT_ACCESS_TO_DUNDER": { Review Comment: We can update it as a follow-up if there is any better suggestions for the name of error classes (It's not a API changes but only affect to error message, so it would be fine) Which one do you prefer `CANNOT_ACCESS_DUNDER` or `ILLEGAL_ACCESS_TO_DUNDER`? Can you pick one of them, or do you have even better idea? Then I can make a follow-up for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] itholic commented on a diff in pull request #40642: [SPARK-43010][PYTHON] Migrate Column errors into error class
itholic commented on code in PR #40642: URL: https://github.com/apache/spark/pull/40642#discussion_r1329417329 ## python/pyspark/errors/error_classes.py: ## @@ -24,6 +24,21 @@ "Argument `` is required when ." ] }, + "CANNOT_ACCESS_TO_DUNDER": { Review Comment: We can update it as a follow-up if there is any better suggestions for the name of error classes (It's not a API changes but only affect to error message, so it would be fine) Which one do you prefer `CANNOT_ACCESS_DUNDER` or `ILLEGAL_ACCESS_TO_DUNDER`? Can you pick one of them, or even better idea? Then I can make a follow-up for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Hisoka-X commented on pull request #42960: [SPARK-45078][SQL][3.4] Fix `array_insert` ImplicitCastInputTypes not work
Hisoka-X commented on PR #42960: URL: https://github.com/apache/spark/pull/42960#issuecomment-1724658573 Thanks @MaxGekk @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329416576 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2180,6 +2180,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED = +buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled") + .internal() + .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " + +"does not support Trigger.AvailableNow. Enabling this allows the benefits of " + +"Trigger.AvailableNow with sources which don't support it, but some sources " + +"may show unexpected behavior including duplication, data loss, etc. So use with " + Review Comment: This new config is marked as "internal", which is not going to be a part of public documentation. We expect very advanced users and/or operators only know about the existence of the config and use it as their own risk. We don't even expose this config in the warning message - that's the way I avoid users from shooting themselves on the foot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrappe
HeartSaVioR commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329415427 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2180,6 +2180,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED = +buildConf("spark.sql.streaming.triggerAvailableNowWrapper.enabled") + .internal() + .doc("Whether to use the wrapper implementation of Trigger.AvailableNow if the source " + +"does not support Trigger.AvailableNow. Enabling this allows the benefits of " + +"Trigger.AvailableNow with sources which don't support it, but some sources " + +"may show unexpected behavior including duplication, data loss, etc. So use with " + +"extreme care! The ideal direction is to persuade developers of source(s) to " + +"support Trigger.AvailableNow.") Review Comment: I'd say we are going to break a bunch of 3rd party data sources then, which is what we have been avoided so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #41498: [SPARK-44001][Protobuf] spark protobuf: handle well known wrapper types
github-actions[bot] closed pull request #41498: [SPARK-44001][Protobuf] spark protobuf: handle well known wrapper types URL: https://github.com/apache/spark/pull/41498 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #42977: [SPARK-44788][PYTHON][DOCS][FOLLOW-UP] Move `from_xml`/`schema_of_xml` to `Xml Functions`
zhengruifeng commented on PR #42977: URL: https://github.com/apache/spark/pull/42977#issuecomment-1724649338 thank you guys! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #42612: [SPARK-44913][SQL] DS V2 supports push down V2 UDF that has magic method
sunchao commented on code in PR #42612: URL: https://github.com/apache/spark/pull/42612#discussion_r1329406870 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala: ## @@ -279,7 +283,9 @@ case class StaticInvoke( inputTypes: Seq[AbstractDataType] = Nil, propagateNull: Boolean = true, returnNullable: Boolean = true, -isDeterministic: Boolean = true) extends InvokeLike { +isDeterministic: Boolean = true, +scalarFunctionName: Option[String] = None, Review Comment: Hmm instead of adding these two parameters, can we instead check `staticObject` and `functionName`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #42968: [SPARK-45113][PYTHON][DOCS][FOLLOWUP] Add sorting to the example of `collect_set/collect_list` to ensure stable results
HyukjinKwon closed pull request #42968: [SPARK-45113][PYTHON][DOCS][FOLLOWUP] Add sorting to the example of `collect_set/collect_list` to ensure stable results URL: https://github.com/apache/spark/pull/42968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42968: [SPARK-45113][PYTHON][DOCS][FOLLOWUP] Add sorting to the example of `collect_set/collect_list` to ensure stable results
HyukjinKwon commented on PR #42968: URL: https://github.com/apache/spark/pull/42968#issuecomment-1724630071 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
HyukjinKwon commented on PR #42986: URL: https://github.com/apache/spark/pull/42986#issuecomment-1724629485 Otherwise looks sane to me. cc @ueshin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
HyukjinKwon commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329393005 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -69,8 +73,32 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] while True: df_ref_id = utf8_deserializer.loads(infile) batch_id = read_long(infile) -process(df_ref_id, int(batch_id)) # TODO(SPARK-44463): Propagate error to the user. -write_int(0, outfile) +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. +try: +process(df_ref_id, int(batch_id)) +write_int(0, outfile) +except BaseException as e: Review Comment: you could put that into `pyspark.util` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
HyukjinKwon commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329392786 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -69,8 +73,32 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] while True: df_ref_id = utf8_deserializer.loads(infile) batch_id = read_long(infile) -process(df_ref_id, int(batch_id)) # TODO(SPARK-44463): Propagate error to the user. -write_int(0, outfile) +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. +try: +process(df_ref_id, int(batch_id)) +write_int(0, outfile) +except BaseException as e: Review Comment: I think we should probably put this handling into a util and reuse it in the regular `worker.py` too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
HyukjinKwon commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329392064 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala: ## @@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging { dataOut.writeLong(args.batchId) dataOut.flush() - val ret = dataIn.readInt() - logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + try { +dataIn.readInt() match { + case ret if ret == 0 => +logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)") + case SpecialLengths.PYTHON_EXCEPTION_THROWN => +val exLength = dataIn.readInt() +val obj = new Array[Byte](exLength) +dataIn.readFully(obj) +val msg = new String(obj, StandardCharsets.UTF_8) +throw new IllegalStateException(s"Found error inside foreachBatch Python process: $msg") Review Comment: Should it be `PythonException`? To match with `PythonRunner.handlePythonException`. Or it'd be great to see if we can reuse them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #42949: [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query
HyukjinKwon commented on code in PR #42949: URL: https://github.com/apache/spark/pull/42949#discussion_r1329390408 ## python/pyspark/sql/connect/client/logging.py: ## @@ -0,0 +1,42 @@ +import logging Review Comment: Seems like linter complains that there's no license header :-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42965: [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function
HyukjinKwon commented on PR #42965: URL: https://github.com/apache/spark/pull/42965#issuecomment-1724619612 Merged to branch-3.5 too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #42973: [SPARK-45167][CONNECT][PYTHON][3.5] Python client must call `release_all`
HyukjinKwon closed pull request #42973: [SPARK-45167][CONNECT][PYTHON][3.5] Python client must call `release_all` URL: https://github.com/apache/spark/pull/42973 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong opened a new pull request, #42987: [SPARK-45207][SQL][CONNECT] Implement FetchErrorDetails RPC
heyihong opened a new pull request, #42987: URL: https://github.com/apache/spark/pull/42987 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42973: [SPARK-45167][CONNECT][PYTHON][3.5] Python client must call `release_all`
HyukjinKwon commented on PR #42973: URL: https://github.com/apache/spark/pull/42973#issuecomment-1724618155 Merged to branch-3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #42910: [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
HyukjinKwon closed pull request #42910: [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED URL: https://github.com/apache/spark/pull/42910 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #42910: [SPARK-45133][CONNECT][TESTS][FOLLOWUP] Add test that queries transition to FINISHED
HyukjinKwon commented on PR #42910: URL: https://github.com/apache/spark/pull/42910#issuecomment-1724617224 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329325417 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala: ## @@ -26,47 +26,131 @@ import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext} +import org.apache.spark.connect.proto.FetchErrorDetailsResponse.ExceptionInfo +import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.util.JsonUtils -private[client] object GrpcExceptionConverter extends JsonUtils { - def convert[T](f: => T): T = { +/** + * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions. + * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional + * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side + * stacktrace. + * + * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the + * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If + * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException + * itself. + */ +private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) { + import GrpcExceptionConverter._ + + def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = { try { f } catch { case e: StatusRuntimeException => -throw toThrowable(e) +throw toThrowable(e, sessionId, userContext) } } - def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = { + def convertIterator[T]( + sessionId: String, + userContext: UserContext, + iter: CloseableIterator[T]): CloseableIterator[T] = { new WrappedCloseableIterator[T] { override def innerIterator: Iterator[T] = iter override def hasNext: Boolean = { -convert { +convert(sessionId, userContext) { iter.hasNext } } override def next(): T = { -convert { +convert(sessionId, userContext) { iter.next() } } override def close(): Unit = { -convert { +convert(sessionId, userContext) { iter.close() } } } } + /** + * fetchEnrichedError fetches enriched errors with full exception message and optionally + * stacktrace by issuing an additional RPC call to fetch error details. The RPC call is + * best-effort at-most-once. + */ + private def fetchEnrichedError( + info: ErrorInfo, + sessionId: String, + userContext: UserContext): Option[Throwable] = { +val errorId = info.getMetadataOrDefault("errorId", null) +if (errorId == null) { + return None Review Comment: It is possible if a client with newer version connects to a server with older version or enrich error is disabled -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #42793: [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
dongjoon-hyun commented on PR #42793: URL: https://github.com/apache/spark/pull/42793#issuecomment-1724614064 Thank you, @itholic and all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun closed pull request #42793: [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0
dongjoon-hyun closed pull request #42793: [SPARK-45065][PYTHON][PS] Support Pandas 2.1.0 URL: https://github.com/apache/spark/pull/42793 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapp
anishshri-db commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329378160 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala: ## @@ -52,11 +52,40 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { -case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) -case OneTimeTrigger => SingleBatchExecutor() -case AvailableNowTrigger => MultiBatchExecutor() -case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { +assert(sources.nonEmpty, "sources should have been retrieved from the plan!") +trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => +// See SPARK-45178 for more details. +if (sparkSession.sqlContext.conf.getConf( +SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { + logInfo("Configured to use the wrapper of Trigger.AvailableNow.") + MultiBatchExecutor() +} else { + val supportsTriggerAvailableNow = sources.distinct.forall { src => +val supports = src.isInstanceOf[SupportsTriggerAvailableNow] +if (!supports) { + logWarning(s"source [$src] does not support Trigger.AvailableNow. Failing back to " + Review Comment: Nit: Typo ? `Falling back to` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anishshri-db commented on a diff in pull request #42940: [SPARK-45178][SS] Fallback to execute a single batch for Trigger.AvailableNow with unsupported sources rather than using wrapp
anishshri-db commented on code in PR #42940: URL: https://github.com/apache/spark/pull/42940#discussion_r1329377950 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala: ## @@ -52,11 +52,40 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty - protected val triggerExecutor: TriggerExecutor = trigger match { -case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) -case OneTimeTrigger => SingleBatchExecutor() -case AvailableNowTrigger => MultiBatchExecutor() -case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ + + protected def getTrigger(): TriggerExecutor = { +assert(sources.nonEmpty, "sources should have been retrieved from the plan!") +trigger match { + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) + case OneTimeTrigger => SingleBatchExecutor() + case AvailableNowTrigger => +// See SPARK-45178 for more details. +if (sparkSession.sqlContext.conf.getConf( +SQLConf.STREAMING_TRIGGER_AVAILABLE_NOW_WRAPPER_ENABLED)) { + logInfo("Configured to use the wrapper of Trigger.AvailableNow.") Review Comment: Can we log some more query specific info here ? or we don't have more info here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329300383 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -778,6 +778,67 @@ message ReleaseExecuteResponse { optional string operation_id = 2; } +message FetchErrorDetailsRequest { + + // (Required) + // The session_id specifies a Spark session for a user identified by user_context.user_id. + // The id should be a UUID string of the format `00112233-4455-6677-8899-aabbccddeeff`. + string session_id = 1; + + // User context + UserContext user_context = 2; + + // (Required) + // The id of the error. + string error_id = 3; + + // Specifies whether to include the stacktrace in the error message when Review Comment: This is for Python Client. Unlike Scala client, Python client (or non-jvm client) does not support constructing jvm stacktrace from structured data very well, so we should render the message with stacktrace directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329300029 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala: ## @@ -57,28 +69,105 @@ private[connect] object ErrorUtils extends Logging { classes.toSeq } - private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: Boolean): RPCStatus = { + private def serializeClasses(t: Throwable): String = { + JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName))) + } + + private[connect] val NUM_ERRORS_LIMIT = 5 + + // We can get full exception messages and optionally stacktrace by + // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller + // limit to reduce the probability of hitting the 8KB header limit. + private val MAX_MESSAGE_SIZE = 512 + + // Convert Throwable to a protobuf message FetchErrorDetailsResponse. + // Truncate error messages by default. + private[connect] def throwableToFetchErrorDetailsResponse( Review Comment: I will remove it then... The issue here is that unlike Python or other non-jvm client, they can directly fallback to use message and classes in ErrorInfo if the RPC fails. For scala client, doing such fallback means losing cause exceptions that may affect control flows -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329300029 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala: ## @@ -57,28 +69,105 @@ private[connect] object ErrorUtils extends Logging { classes.toSeq } - private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: Boolean): RPCStatus = { + private def serializeClasses(t: Throwable): String = { + JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName))) + } + + private[connect] val NUM_ERRORS_LIMIT = 5 + + // We can get full exception messages and optionally stacktrace by + // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller + // limit to reduce the probability of hitting the 8KB header limit. + private val MAX_MESSAGE_SIZE = 512 + + // Convert Throwable to a protobuf message FetchErrorDetailsResponse. + // Truncate error messages by default. + private[connect] def throwableToFetchErrorDetailsResponse( Review Comment: I will remove it then... The issue here is that unlike Python or other non-jvm client, they can directly fallback to use message and classes in ErrorInfo if the RPC fails. For scala client, doing such fallback means losing cause exceptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329300029 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala: ## @@ -57,28 +69,105 @@ private[connect] object ErrorUtils extends Logging { classes.toSeq } - private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: Boolean): RPCStatus = { + private def serializeClasses(t: Throwable): String = { + JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName))) + } + + private[connect] val NUM_ERRORS_LIMIT = 5 + + // We can get full exception messages and optionally stacktrace by + // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller + // limit to reduce the probability of hitting the 8KB header limit. + private val MAX_MESSAGE_SIZE = 512 + + // Convert Throwable to a protobuf message FetchErrorDetailsResponse. + // Truncate error messages by default. + private[connect] def throwableToFetchErrorDetailsResponse( Review Comment: I will remove it then... The issue here is that unlike Python or other non-jvm client, they can directly fallback to use message and classes in ErrorInfo . For scala client, doing such fallback means losing cause exceptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bogao007 commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
bogao007 commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329356792 ## python/pyspark/sql/connect/streaming/worker/listener_worker.py: ## @@ -83,7 +86,14 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped while True: event = utf8_deserializer.loads(infile) event_type = read_int(infile) -process(event, int(event_type)) # TODO(SPARK-44463): Propagate error to the user. +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. +try: +process(event, int(event_type)) +write_int(0, outfile) Review Comment: I think that's the only way to propagate something out of the Python worker, if it fails, then I'm not sure if there's anything else that could propagate that error out. From the [existing worker](https://github.com/apache/spark/blob/981312284f0776ca847c8d21411f74a72c639b22/python/pyspark/sql/worker/analyze_udtf.py#L161-L163), it does not do anything if there's an IOException but close the python process. But let me update the code to use the same way they did, which should be slightly better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #42893: [SPARK-44459][Core] Add System.runFinalization() to periodic cleanup
mridulm commented on PR #42893: URL: https://github.com/apache/spark/pull/42893#issuecomment-1724562144 If this is specific to this deployment, as @srowen mentioned, why not do this in user code/library? You can run a thread which periodically does this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329353558 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala: ## @@ -291,15 +307,16 @@ object SparkConnectService extends Logging { } // Simple builder for creating the cache of Sessions. - private def cacheBuilder(cacheSize: Int, timeoutSeconds: Int): CacheBuilder[Object, Object] = { + private[service] def cacheBuilder( Review Comment: Yes, SessionHolder needs to access cacheBuilder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329352625 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.service + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.FetchErrorDetailsResponse +import org.apache.spark.sql.connect.config.Connect +import org.apache.spark.sql.connect.utils.ErrorUtils +import org.apache.spark.sql.internal.SQLConf + +class SparkConnectFetchErrorDetailsHandler( Review Comment: Will add. The reason I didn't add doc here is because I was trying to be consistent with the document style under this package (other handlers don't have docs as well)... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #42426: [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry
mridulm commented on PR #42426: URL: https://github.com/apache/spark/pull/42426#issuecomment-1724557591 Very good callout @Ngone51 , we should probably add a check style error as well to prevent its usage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329349965 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala: ## @@ -93,33 +177,44 @@ private[client] object GrpcExceptionConverter extends JsonUtils { new SparkArrayIndexOutOfBoundsException(message)), errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)), errorConstructor((message, cause) => new SparkRuntimeException(message, cause)), -errorConstructor((message, cause) => new SparkUpgradeException(message, cause))) +errorConstructor((message, cause) => new SparkUpgradeException(message, cause)), +errorConstructor((message, cause) => new SparkException(message, cause.orNull))) - private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = { -val classes = - mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]]) + private def exceptionInfoToThrowable(exceptionInfo: ExceptionInfo): Option[Throwable] = { +val classHierarchy = exceptionInfo.getErrorTypeHierarchyList.asScala -classes - .find(errorFactory.contains) - .map { cls => -val constructor = errorFactory.get(cls).get -constructor(message, None) - } - } - - private def toThrowable(ex: StatusRuntimeException): Throwable = { -val status = StatusProto.fromThrowable(ex) - -val fallbackEx = new SparkException(ex.toString, ex.getCause) +if (classHierarchy.isEmpty) { + return None +} -val errorInfoOpt = status.getDetailsList.asScala - .find(_.is(classOf[ErrorInfo])) +val constructor = + classHierarchy +.flatMap(errorFactory.get) +.headOption +.getOrElse((message: String, cause: Option[Throwable]) => + new SparkException( +s"Exception in server ${classHierarchy.head}: ${message}", +cause.orNull)) + +val causeOpt = if (exceptionInfo.hasCause) { + exceptionInfoToThrowable(exceptionInfo.getCause) +} else { + None +} Review Comment: It fallbacks to the abbreviated FetchErrorDetailsResponse. The below test case covers the fallback case. ``` for (enrichErrorEnabled <- Seq(false, true)) { test(s"throw SparkException with large cause exception - $enrichErrorEnabled") { withSQLConf("spark.sql.connect.enrichError.enabled" -> enrichErrorEnabled.toString) { val session = spark import session.implicits._ val throwException = udf((_: String) => throw new SparkException("test" * 1)) val ex = intercept[SparkException] { Seq("1").toDS.withColumn("udf_val", throwException($"value")).collect() } assert(ex.getCause.isInstanceOf[SparkException]) if (enrichErrorEnabled) { assert(ex.getCause.getMessage.contains("test" * 1)) } else { assert(ex.getCause.getMessage.startsWith("test")) } } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1329349965 ## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala: ## @@ -93,33 +177,44 @@ private[client] object GrpcExceptionConverter extends JsonUtils { new SparkArrayIndexOutOfBoundsException(message)), errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)), errorConstructor((message, cause) => new SparkRuntimeException(message, cause)), -errorConstructor((message, cause) => new SparkUpgradeException(message, cause))) +errorConstructor((message, cause) => new SparkUpgradeException(message, cause)), +errorConstructor((message, cause) => new SparkException(message, cause.orNull))) - private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = { -val classes = - mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]]) + private def exceptionInfoToThrowable(exceptionInfo: ExceptionInfo): Option[Throwable] = { +val classHierarchy = exceptionInfo.getErrorTypeHierarchyList.asScala -classes - .find(errorFactory.contains) - .map { cls => -val constructor = errorFactory.get(cls).get -constructor(message, None) - } - } - - private def toThrowable(ex: StatusRuntimeException): Throwable = { -val status = StatusProto.fromThrowable(ex) - -val fallbackEx = new SparkException(ex.toString, ex.getCause) +if (classHierarchy.isEmpty) { + return None +} -val errorInfoOpt = status.getDetailsList.asScala - .find(_.is(classOf[ErrorInfo])) +val constructor = + classHierarchy +.flatMap(errorFactory.get) +.headOption +.getOrElse((message: String, cause: Option[Throwable]) => + new SparkException( +s"Exception in server ${classHierarchy.head}: ${message}", +cause.orNull)) + +val causeOpt = if (exceptionInfo.hasCause) { + exceptionInfoToThrowable(exceptionInfo.getCause) +} else { + None +} Review Comment: It fallbacks to the abbreviated FetchErrorDetailsResponse. ``` for (enrichErrorEnabled <- Seq(false, true)) { test(s"throw SparkException with large cause exception - $enrichErrorEnabled") { withSQLConf("spark.sql.connect.enrichError.enabled" -> enrichErrorEnabled.toString) { val session = spark import session.implicits._ val throwException = udf((_: String) => throw new SparkException("test" * 1)) val ex = intercept[SparkException] { Seq("1").toDS.withColumn("udf_val", throwException($"value")).collect() } assert(ex.getCause.isInstanceOf[SparkException]) if (enrichErrorEnabled) { assert(ex.getCause.getMessage.contains("test" * 1)) } else { assert(ex.getCause.getMessage.startsWith("test")) } } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker
WweiL commented on code in PR #42986: URL: https://github.com/apache/spark/pull/42986#discussion_r1329337505 ## python/pyspark/sql/connect/streaming/worker/listener_worker.py: ## @@ -83,7 +86,14 @@ def process(listener_event_str, listener_event_type): # type: ignore[no-untyped while True: event = utf8_deserializer.loads(infile) event_type = read_int(infile) -process(event, int(event_type)) # TODO(SPARK-44463): Propagate error to the user. +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. +try: +process(event, int(event_type)) +write_int(0, outfile) Review Comment: Do you think it's possible that the `read_` and `write_` method could throw exceptions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org