This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 913991046c6 [SPARK-45240][SQL][CONNECT] Implement Error Enrichment for Python Client 913991046c6 is described below commit 913991046c6d2b707eab64bd8ca874f9b9bb6581 Author: Yihong He <heyihong...@gmail.com> AuthorDate: Mon Sep 25 09:35:06 2023 +0900 [SPARK-45240][SQL][CONNECT] Implement Error Enrichment for Python Client ### What changes were proposed in this pull request? - Implemented the reconstruction of the exception with un-truncated error messages and full server-side stacktrace (includes cause exceptions) based on the responses of FetchErrorDetails RPC. Examples: `./bin/pyspark --remote local` ```python >>> spark.sql("""select from_json('{"d": "02-29"}', 'd date', map('dateFormat', 'MM-dd'))""").collect() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/session.py", line 556, in sql data, properties = self.client.execute_command(cmd.command(self._client)) File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/client/core.py", line 958, in execute_command data, _, _, _, properties = self._execute_and_fetch(req) File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/client/core.py", line 1259, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator(req): File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/client/core.py", line 1240, in _execute_and_fetch_as_iterator self._handle_error(error) File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/client/core.py", line 1479, in _handle_error self._handle_rpc_error(error) File "/Users/yihonghe/Workspace/spark/python/pyspark/sql/connect/client/core.py", line 1533, in _handle_rpc_error raise convert_exception( pyspark.errors.exceptions.connect.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0: Fail to parse '02-29' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string. JVM stacktrace: org.apache.spark.SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER] You may get a different result due to the upgrading to Spark >= 3.0: Fail to parse '02-29' in the new parser. You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, or set to "CORRECTED" and treat it as an invalid datetime string. at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError(ExecutionErrors.scala:54) at org.apache.spark.sql.errors.ExecutionErrors.failToParseDateTimeInNewParserError$(ExecutionErrors.scala:48) at org.apache.spark.sql.errors.ExecutionErrors$.failToParseDateTimeInNewParserError(ExecutionErrors.scala:218) at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:142) at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:135) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:35) at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.parse(DateFormatter.scala:59) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$11$1.applyOrElse(JacksonParser.scala:302) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$11$1.applyOrElse(JacksonParser.scala:299) at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:404) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeConverter$11(JacksonParser.scala:299) at org.apache.spark.sql.catalyst.json.JacksonParser.org$apache$spark$sql$catalyst$json$JacksonParser$$convertObject(JacksonParser.scala:457) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:123) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:122) at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:404) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:122) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:582) at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:48) at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:95) at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:577) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.$anonfun$parser$3(jsonExpressions.scala:618) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.nullSafeEval(jsonExpressions.scala:630) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:547) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:81) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.$anonfun$constantFolding$4(expressions.scala:91) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1217) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1216) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:533) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:91) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.$anonfun$applyOrElse$1(expressions.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:208) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:208) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:219) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:224) at scala.collection.immutable.List.map(List.scala:246) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:229) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:306) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:229) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:95) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:463) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:463) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:429) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:94) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:47) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183) at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179) at scala.collection.immutable.List.foldLeft(List.scala:79) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:162) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:229) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:556) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:229) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:228) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:154) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:192) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:189) at org.apache.spark.sql.execution.QueryExecution.assertExecutedPlanPrepared(QueryExecution.scala:204) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2551) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2455) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:202) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:189) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:188) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:201) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228) Caused by: java.time.DateTimeException: Invalid date 'February 29' as '1970' is not a leap year at java.time.LocalDate.create(LocalDate.java:459) at java.time.LocalDate.of(LocalDate.java:273) at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.toLocalDate(DateTimeFormatterHelper.scala:71) at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper.toLocalDate$(DateTimeFormatterHelper.scala:53) at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.toLocalDate(DateFormatter.scala:41) at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.parse(DateFormatter.scala:57) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$11$1.applyOrElse(JacksonParser.scala:302) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeConverter$11$1.applyOrElse(JacksonParser.scala:299) at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:404) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeConverter$11(JacksonParser.scala:299) at org.apache.spark.sql.catalyst.json.JacksonParser.org$apache$spark$sql$catalyst$json$JacksonParser$$convertObject(JacksonParser.scala:457) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:123) at org.apache.spark.sql.catalyst.json.JacksonParser$$anonfun$$nestedInanonfun$makeStructRootConverter$3$1.applyOrElse(JacksonParser.scala:122) at org.apache.spark.sql.catalyst.json.JacksonParser.parseJsonToken(JacksonParser.scala:404) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$makeStructRootConverter$3(JacksonParser.scala:122) at org.apache.spark.sql.catalyst.json.JacksonParser.$anonfun$parse$2(JacksonParser.scala:582) at org.apache.spark.util.SparkErrorUtils.tryWithResource(SparkErrorUtils.scala:48) at org.apache.spark.util.SparkErrorUtils.tryWithResource$(SparkErrorUtils.scala:46) at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:95) at org.apache.spark.sql.catalyst.json.JacksonParser.parse(JacksonParser.scala:577) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.$anonfun$parser$3(jsonExpressions.scala:618) at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60) at org.apache.spark.sql.catalyst.expressions.JsonToStructs.nullSafeEval(jsonExpressions.scala:630) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:547) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:81) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.$anonfun$constantFolding$4(expressions.scala:91) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1217) at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1216) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:533) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.org$apache$spark$sql$catalyst$optimizer$ConstantFolding$$constantFolding(expressions.scala:91) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.$anonfun$applyOrElse$1(expressions.scala:95) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:208) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:208) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:219) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:224) at scala.collection.immutable.List.map(List.scala:246) at scala.collection.immutable.List.map(List.scala:79) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:224) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:229) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:306) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:229) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:95) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$$anonfun$apply$1.applyOrElse(expressions.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:463) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:463) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32) at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:429) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:94) at org.apache.spark.sql.catalyst.optimizer.ConstantFolding$.apply(expressions.scala:47) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183) at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179) at scala.collection.immutable.List.foldLeft(List.scala:79) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:333) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:162) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:229) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:556) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:229) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:228) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:158) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:154) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:172) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:192) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:189) at org.apache.spark.sql.execution.QueryExecution.assertExecutedPlanPrepared(QueryExecution.scala:204) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2551) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2455) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:202) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContextClassLoader$1(SessionHolder.scala:189) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:179) at org.apache.spark.sql.connect.service.SessionHolder.withContextClassLoader(SessionHolder.scala:188) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:201) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:132) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:84) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:228) ``` ### Why are the changes needed? - Un-truncated error message and full server-side stacktrace is useful for debugging ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing tests ### Was this patch authored or co-authored using generative AI tooling? Closes #43034 from heyihong/SPARK-45240. Authored-by: Yihong He <heyihong...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/errors/exceptions/connect.py | 46 +++++++++++-- python/pyspark/sql/connect/client/core.py | 23 ++++++- .../sql/tests/connect/test_connect_basic.py | 76 +++++++++++++++++----- 3 files changed, 124 insertions(+), 21 deletions(-) diff --git a/python/pyspark/errors/exceptions/connect.py b/python/pyspark/errors/exceptions/connect.py index 48b213080c1..dd645000e7b 100644 --- a/python/pyspark/errors/exceptions/connect.py +++ b/python/pyspark/errors/exceptions/connect.py @@ -14,8 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import pyspark.sql.connect.proto as pb2 import json -from typing import Dict, Optional, TYPE_CHECKING +from typing import Dict, List, Optional, TYPE_CHECKING from pyspark.errors.exceptions.base import ( @@ -45,14 +46,22 @@ class SparkConnectException(PySparkException): """ -def convert_exception(info: "ErrorInfo", message: str) -> SparkConnectException: +def convert_exception( + info: "ErrorInfo", truncated_message: str, resp: Optional[pb2.FetchErrorDetailsResponse] +) -> SparkConnectException: classes = [] if "classes" in info.metadata: classes = json.loads(info.metadata["classes"]) - if "stackTrace" in info.metadata: - stackTrace = info.metadata["stackTrace"] - message += f"\n\nJVM stacktrace:\n{stackTrace}" + if resp is not None and resp.HasField("root_error_idx"): + message = resp.errors[resp.root_error_idx].message + stacktrace = _extract_jvm_stacktrace(resp) + else: + message = truncated_message + stacktrace = info.metadata["stackTrace"] if "stackTrace" in info.metadata else "" + + if len(stacktrace) > 0: + message += f"\n\nJVM stacktrace:\n{stacktrace}" if "org.apache.spark.sql.catalyst.parser.ParseException" in classes: return ParseException(message) @@ -89,6 +98,33 @@ def convert_exception(info: "ErrorInfo", message: str) -> SparkConnectException: return SparkConnectGrpcException(message, reason=info.reason) +def _extract_jvm_stacktrace(resp: pb2.FetchErrorDetailsResponse) -> str: + if len(resp.errors[resp.root_error_idx].stack_trace) == 0: + return "" + + lines: List[str] = [] + + def format_stacktrace(error: pb2.FetchErrorDetailsResponse.Error) -> None: + message = f"{error.error_type_hierarchy[0]}: {error.message}" + if len(lines) == 0: + lines.append(message) + else: + lines.append(f"Caused by: {message}") + for elem in error.stack_trace: + lines.append( + f"\tat {elem.declaring_class}.{elem.method_name}" + f"({elem.file_name}:{elem.line_number})" + ) + + # If this error has a cause, format that recursively + if error.HasField("cause_idx"): + format_stacktrace(resp.errors[error.cause_idx]) + + format_stacktrace(resp.errors[resp.root_error_idx]) + + return "\n".join(lines) + + class SparkConnectGrpcException(SparkConnectException): """ Base class to handle the errors from GRPC. diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 34a867ce101..ec5998b0b17 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -96,6 +96,7 @@ from pyspark.errors import PySparkValueError if TYPE_CHECKING: + from google.rpc.error_details_pb2 import ErrorInfo from pyspark.sql.connect._typing import DataTypeOrString @@ -1483,6 +1484,23 @@ class SparkConnectClient(object): ) from None raise error + def _fetch_enriched_error(self, info: "ErrorInfo") -> Optional[pb2.FetchErrorDetailsResponse]: + if "errorId" not in info.metadata: + return None + + req = pb2.FetchErrorDetailsRequest( + session_id=self._session_id, + client_type=self._builder.userAgent, + error_id=info.metadata["errorId"], + ) + if self._user_id: + req.user_context.user_id = self._user_id + + try: + return self._stub.FetchErrorDetails(req) + except grpc.RpcError: + return None + def _handle_rpc_error(self, rpc_error: grpc.RpcError) -> NoReturn: """ Error handling helper for dealing with GRPC Errors. On the server side, certain @@ -1511,7 +1529,10 @@ class SparkConnectClient(object): if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR): info = error_details_pb2.ErrorInfo() d.Unpack(info) - raise convert_exception(info, status.message) from None + + raise convert_exception( + info, status.message, self._fetch_enriched_error(info) + ) from None raise SparkConnectGrpcException(status.message) from None else: diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index c5a127136d6..620c2f9d76a 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -62,6 +62,7 @@ from pyspark.errors.exceptions.connect import ( AnalysisException, ParseException, SparkConnectException, + SparkUpgradeException, ) if should_test_connect: @@ -3296,22 +3297,66 @@ class SparkConnectSessionTests(ReusedConnectTestCase): self.spark.conf.get("some.conf") self._check_no_active_session_error(e.exception) - def test_error_stack_trace(self): - with self.sql_conf({"spark.sql.pyspark.jvmStacktrace.enabled": True}): - with self.assertRaises(AnalysisException) as e: - self.spark.sql("select x").collect() - self.assertTrue("JVM stacktrace" in e.exception.message) - self.assertTrue( - "at org.apache.spark.sql.catalyst.analysis.CheckAnalysis" in e.exception.message - ) - - with self.sql_conf({"spark.sql.pyspark.jvmStacktrace.enabled": False}): + def test_error_enrichment_message(self): + with self.sql_conf( + { + "spark.sql.connect.enrichError.enabled": True, + "spark.sql.connect.serverStacktrace.enabled": False, + "spark.sql.pyspark.jvmStacktrace.enabled": False, + } + ): + name = "test" * 10000 with self.assertRaises(AnalysisException) as e: - self.spark.sql("select x").collect() + self.spark.sql("select " + name).collect() + self.assertTrue(name in e.exception.message) self.assertFalse("JVM stacktrace" in e.exception.message) - self.assertFalse( - "at org.apache.spark.sql.catalyst.analysis.CheckAnalysis" in e.exception.message - ) + + def test_error_enrichment_jvm_stacktrace(self): + with self.sql_conf( + { + "spark.sql.connect.enrichError.enabled": True, + "spark.sql.pyspark.jvmStacktrace.enabled": False, + } + ): + with self.sql_conf({"spark.sql.connect.serverStacktrace.enabled": False}): + with self.assertRaises(SparkUpgradeException) as e: + self.spark.sql( + """select from_json( + '{"d": "02-29"}', 'd date', map('dateFormat', 'MM-dd'))""" + ).collect() + self.assertFalse("JVM stacktrace" in e.exception.message) + + with self.sql_conf({"spark.sql.connect.serverStacktrace.enabled": True}): + with self.assertRaises(SparkUpgradeException) as e: + self.spark.sql( + """select from_json( + '{"d": "02-29"}', 'd date', map('dateFormat', 'MM-dd'))""" + ).collect() + self.assertTrue("JVM stacktrace" in e.exception.message) + self.assertTrue("org.apache.spark.SparkUpgradeException:" in e.exception.message) + self.assertTrue( + "at org.apache.spark.sql.errors.ExecutionErrors" + ".failToParseDateTimeInNewParserError" in e.exception.message + ) + self.assertTrue("Caused by: java.time.DateTimeException:" in e.exception.message) + + def test_error_stack_trace(self): + with self.sql_conf({"spark.sql.connect.enrichError.enabled": False}): + with self.sql_conf({"spark.sql.pyspark.jvmStacktrace.enabled": True}): + with self.assertRaises(AnalysisException) as e: + self.spark.sql("select x").collect() + self.assertTrue("JVM stacktrace" in e.exception.message) + self.assertTrue( + "at org.apache.spark.sql.catalyst.analysis.CheckAnalysis" in e.exception.message + ) + + with self.sql_conf({"spark.sql.pyspark.jvmStacktrace.enabled": False}): + with self.assertRaises(AnalysisException) as e: + self.spark.sql("select x").collect() + self.assertFalse("JVM stacktrace" in e.exception.message) + self.assertFalse( + "at org.apache.spark.sql.catalyst.analysis.CheckAnalysis" in e.exception.message + ) # Create a new session with a different stack trace size. self.spark.stop() @@ -3321,7 +3366,8 @@ class SparkConnectSessionTests(ReusedConnectTestCase): .remote("local[4]") .getOrCreate() ) - spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", "true") + spark.conf.set("spark.sql.connect.enrichError.enabled", False) + spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", True) with self.assertRaises(AnalysisException) as e: spark.sql("select x").collect() self.assertTrue("JVM stacktrace" in e.exception.message) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org