[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330215015


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -778,6 +778,67 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+message FetchErrorDetailsRequest {
+
+  // (Required)
+  // The session_id specifies a Spark session for a user identified by 
user_context.user_id.
+  // The id should be a UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`.
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // The id of the error.
+  string error_id = 3;
+
+  // Specifies whether to include the stacktrace in the error message when

Review Comment:
   https://github.com/apache/spark/pull/42377#discussion_r1330178093



##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -778,6 +778,67 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+message FetchErrorDetailsRequest {
+
+  // (Required)
+  // The session_id specifies a Spark session for a user identified by 
user_context.user_id.
+  // The id should be a UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`.
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // The id of the error.
+  string error_id = 3;
+
+  // Specifies whether to include the stacktrace in the error message when

Review Comment:
   Let's move the discussion to 
https://github.com/apache/spark/pull/42377#discussion_r1330178093



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330216449


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -57,28 +69,105 @@ private[connect] object ErrorUtils extends Logging {
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  private[connect] val NUM_ERRORS_LIMIT = 5
+
+  // We can get full exception messages and optionally stacktrace by
+  // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+  // limit to reduce the probability of hitting the 8KB header limit.
+  private val MAX_MESSAGE_SIZE = 512
+
+  // Convert Throwable to a protobuf message FetchErrorDetailsResponse.
+  // Truncate error messages by default.
+  private[connect] def throwableToFetchErrorDetailsResponse(
+  st: Throwable,
+  isTruncated: Boolean = true,
+  serverStackTraceEnabled: Boolean = false,
+  pySparkJVMStackTraceEnabled: Boolean = false,
+  stackTraceInMessage: Boolean = false): FetchErrorDetailsResponse = {
+val errorChain = traverseCauses(st).take(NUM_ERRORS_LIMIT)
+
+var lastErrorOpt: Option[ExceptionInfo] = None
+
+errorChain.reverse.map { case error =>
+  val builder = ExceptionInfo
+.newBuilder()
+.setMessage(error.getMessage)
+
.addAllErrorTypeHierarchy(ErrorUtils.allClasses(error.getClass).map(_.getName).asJava)
+
+  if (isTruncated) {
+builder.setMessage(SparkConnectService.extractErrorMessage(error, 
MAX_MESSAGE_SIZE))
+  } else {
+if (stackTraceInMessage) {
+  if (serverStackTraceEnabled || pySparkJVMStackTraceEnabled) {
+builder.setMessage(
+  s"${error.getMessage}\n\n" +
+s"JVM stacktrace:\n${ExceptionUtils.getStackTrace(error)}")
+  }
+} else {
+  if (serverStackTraceEnabled) {
+builder.addAllStackTrace(
+  error.getStackTrace
+.map { stackTraceElement =>
+  FetchErrorDetailsResponse.StackTraceElement
+.newBuilder()
+.setDeclaringClass(stackTraceElement.getClassName)
+.setMethodName(stackTraceElement.getMethodName)
+.setFileName(stackTraceElement.getFileName)
+.setLineNumber(stackTraceElement.getLineNumber)
+.build()
+}
+.toIterable
+.asJava)
+  }
+}
+  }
+
+  lastErrorOpt.foreach(builder.setCause(_))
+  lastErrorOpt = Some(builder.build())
+}
+
+val responseBuilder = FetchErrorDetailsResponse.newBuilder()
+
+lastErrorOpt.foreach(responseBuilder.setExceptionInfo(_))
+
+responseBuilder.build()
+  }
+
+  private def buildStatusFromThrowable(
+  st: Throwable,
+  enrichErrorEnabled: Boolean,
+  userId: String,
+  sessionId: String): RPCStatus = {
 val errorInfo = ErrorInfo
   .newBuilder()
   .setReason(st.getClass.getName)
   .setDomain("org.apache.spark")
-  .putMetadata(
-"classes",
-
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName
-
-lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
-val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
-  val maxSize = 
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-  errorInfo.putMetadata("stackTrace", 
StringUtils.abbreviate(stackTrace.get, maxSize))
-} else {
-  errorInfo
+  .putMetadata("classes", serializeClasses(st))
+
+if (enrichErrorEnabled) {
+  val errorId = UUID.randomUUID().toString
+
+  // The errorId of the exception.
+  errorInfo.putMetadata("errorId", errorId)
+
+  SparkConnectService
+.getOrCreateIsolatedSession(userId, sessionId)
+.errorIdToError
+.put(errorId, st)
 }
 
 RPCStatus
   .newBuilder()
   .setCode(RPCCode.INTERNAL_VALUE)
-  .addDetails(ProtoAny.pack(withStackTrace.build()))
-  .setMessage(SparkConnectService.extractErrorMessage(st))
+  .addAllDetails(
+Seq(errorInfo.build(), throwableToFetchErrorDetailsResponse(st))
+  .map(ProtoAny.pack(_))
+  .asJava)
+  .setMessage(SparkConnectService.extractErrorMessage(st, 
MAX_MESSAGE_SIZE))

Review Comment:
   I undo the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


--

[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330217235


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##
@@ -213,4 +205,19 @@ object Connect {
 .version("3.5.0")
 .intConf
 .createWithDefault(200)
+
+  val CONNECT_ENRICH_ERROR_ENABLED =
+buildStaticConf("spark.connect.enrichError.enabled")
+  .doc("When true, it enriches errors with full exception messages on the 
client side.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(true)
+
+  val CONNECT_SERVER_STACKTRACE_ENABLED =

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330178093


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -57,28 +79,103 @@ private[connect] object ErrorUtils extends Logging {
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  // The maximum length of the error chain.
+  private val MAX_ERROR_CHAIN_LENGTH = 5
+
+  // We can get full exception messages and optionally stacktrace by
+  // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+  // limit to reduce the probability of hitting the 8KB header limit.
+  private val MAX_MESSAGE_SIZE = 2048
+
+  /**
+   * Convert Throwable to a protobuf message FetchErrorDetailsResponse.
+   * @param st
+   *   the Throwable to be converted
+   * @param serverStackTraceEnabled
+   *   whether to return the server stack trace.
+   * @param pySparkJVMStackTraceEnabled
+   *   whether to return the server stack trace for Python client.
+   * @param stackTraceInMessage
+   *   whether to include the server stack trace in the message.
+   * @return
+   *   FetchErrorDetailsResponse
+   */
+  private[connect] def throwableToFetchErrorDetailsResponse(
+  st: Throwable,
+  serverStackTraceEnabled: Boolean = false,
+  pySparkJVMStackTraceEnabled: Boolean = false,
+  stackTraceInMessage: Boolean = false): FetchErrorDetailsResponse = {

Review Comment:
   For example, if we want to include the stack trace of cause exceptions in 
the error messages (currently only the stacktrace of the root exception), there 
are more works for all non-jvm clients (and harder to change due to 
multi-versions)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330247802


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  isTruncated = false,
+  serverStackTraceEnabled =
+
sessionHolder.session.conf.get(Connect.CONNECT_SERVER_STACKTRACE_ENABLED),
+  pySparkJVMStackTraceEnabled =
+
sessionHolder.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),

Review Comment:
   If we merge this into one flag, the scala client may be affected by 
PYSPARK_JVM_STACKTRACE_ENABLED as well. But I think it is probably ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330248359


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##
@@ -291,15 +307,16 @@ object SparkConnectService extends Logging {
   }
 
   // Simple builder for creating the cache of Sessions.
-  private def cacheBuilder(cacheSize: Int, timeoutSeconds: Int): 
CacheBuilder[Object, Object] = {
+  private[service] def cacheBuilder(

Review Comment:
   I will isolate them then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330261417


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -57,28 +69,105 @@ private[connect] object ErrorUtils extends Logging {
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  private[connect] val NUM_ERRORS_LIMIT = 5
+
+  // We can get full exception messages and optionally stacktrace by
+  // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+  // limit to reduce the probability of hitting the 8KB header limit.
+  private val MAX_MESSAGE_SIZE = 512
+
+  // Convert Throwable to a protobuf message FetchErrorDetailsResponse.
+  // Truncate error messages by default.
+  private[connect] def throwableToFetchErrorDetailsResponse(
+  st: Throwable,
+  isTruncated: Boolean = true,
+  serverStackTraceEnabled: Boolean = false,
+  pySparkJVMStackTraceEnabled: Boolean = false,

Review Comment:
   Also merge them to one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330327877


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Handles [[proto.FetchErrorDetailsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+sessionHolder.errorIdToError.invalidate(v.getErrorId)
+
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  serverStackTraceEnabled = sessionHolder.session.conf.get(
+Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || 
sessionHolder.session.conf.get(
+SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),
+  stackTraceInMessage = v.getStacktraceInMessage)
+  }
+  .getOrElse(FetchErrorDetailsResponse.newBuilder().build())

Review Comment:
   I changed to use optional 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330327877


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Handles [[proto.FetchErrorDetailsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+sessionHolder.errorIdToError.invalidate(v.getErrorId)
+
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  serverStackTraceEnabled = sessionHolder.session.conf.get(
+Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || 
sessionHolder.session.conf.get(
+SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),
+  stackTraceInMessage = v.getStacktraceInMessage)
+  }
+  .getOrElse(FetchErrorDetailsResponse.newBuilder().build())

Review Comment:
   I think it depends on the meaning of an empty list. But I changed to use 
optional 



##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Handles [[proto.FetchErrorDetailsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+sessionHolder.errorIdToError.invalidate(v.getErrorId)
+
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  serverStackTraceEnabled = sessionHolder.session.conf.get(
+Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || 
sessionHolder.session.conf.get(
+SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),
+  stackTraceInMessage = v.getStacktraceInMessage)
+  }
+  .getOrElse(FetchErrorDetailsResponse.newBuilder().build())

Review Comment:
   I think it depends on the meanin

[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330443354


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Handles [[proto.FetchErrorDetailsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+sessionHolder.errorIdToError.invalidate(v.getErrorId)
+
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  serverStackTraceEnabled = sessionHolder.session.conf.get(
+Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || 
sessionHolder.session.conf.get(
+SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),
+  stackTraceInMessage = v.getStacktraceInMessage)
+  }
+  .getOrElse(FetchErrorDetailsResponse.newBuilder().build())

Review Comment:
   I think we can change to 
   ```
   message Response {
 optional int32 root_error_idx = 1;
   
 repeated Error errors = 2;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330443354


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectFetchErrorDetailsHandler.scala:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Handles [[proto.FetchErrorDetailsRequest]]s for the [[SparkConnectService]].
+ *
+ * @param responseObserver
+ */
+class SparkConnectFetchErrorDetailsHandler(
+responseObserver: StreamObserver[proto.FetchErrorDetailsResponse]) {
+
+  def handle(v: proto.FetchErrorDetailsRequest): Unit = {
+val sessionHolder =
+  SparkConnectService
+.getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+val response = 
Option(sessionHolder.errorIdToError.getIfPresent(v.getErrorId))
+  .map { error =>
+sessionHolder.errorIdToError.invalidate(v.getErrorId)
+
+ErrorUtils.throwableToFetchErrorDetailsResponse(
+  st = error,
+  serverStackTraceEnabled = sessionHolder.session.conf.get(
+Connect.CONNECT_SERVER_STACKTRACE_ENABLED) || 
sessionHolder.session.conf.get(
+SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED),
+  stackTraceInMessage = v.getStacktraceInMessage)
+  }
+  .getOrElse(FetchErrorDetailsResponse.newBuilder().build())

Review Comment:
   I think we can change to 
   ```
   message Response {
 optional int32 root_error_idx = 1;
   
 repeated Error errors = 2;
   }
   ```
   
   We can use the root_error_idx to indicate whether the error exists
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330578327


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##
@@ -45,6 +49,15 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
   private val executions: ConcurrentMap[String, ExecuteHolder] =
 new ConcurrentHashMap[String, ExecuteHolder]()
 
+  // The cache that maps an error id to a throwable. The throwable in cache is 
independent to
+  // each other.
+  val errorIdToError = CacheBuilder

Review Comment:
   Good catch, it shouldn't... 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330596345


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##
@@ -213,4 +214,21 @@ object Connect {
 .version("3.5.0")
 .intConf
 .createWithDefault(200)
+
+  val CONNECT_ENRICH_ERROR_ENABLED =

Review Comment:
   +1, for now, it may be better to have a flag to disable this feature in case 
something is not working.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330611551


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -66,14 +137,26 @@ private[connect] object ErrorUtils extends Logging {
 "classes",
 
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName
 
-lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
-val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
-  val maxSize = 
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-  errorInfo.putMetadata("stackTrace", 
StringUtils.abbreviate(stackTrace.get, maxSize))
-} else {
-  errorInfo
+if (sessionHolder.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)) {
+  // Generate a new unique key for this exception.
+  val errorId = UUID.randomUUID().toString
+
+  errorInfo.putMetadata("errorId", errorId)
+
+  sessionHolder.errorIdToError
+.put(errorId, st)
 }
 
+lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))

Review Comment:
   I can follow up on this in a separate pr. This seems to be done for better 
performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330611551


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -66,14 +137,26 @@ private[connect] object ErrorUtils extends Logging {
 "classes",
 
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName
 
-lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
-val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
-  val maxSize = 
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-  errorInfo.putMetadata("stackTrace", 
StringUtils.abbreviate(stackTrace.get, maxSize))
-} else {
-  errorInfo
+if (sessionHolder.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)) {
+  // Generate a new unique key for this exception.
+  val errorId = UUID.randomUUID().toString
+
+  errorInfo.putMetadata("errorId", errorId)
+
+  sessionHolder.errorIdToError
+.put(errorId, st)
 }
 
+lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))

Review Comment:
   I can follow up on this in a separate pr. This seems to be done for better 
performance but I am not sure if it is really necessary



##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -57,7 +76,59 @@ private[connect] object ErrorUtils extends Logging {
 classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  // The maximum length of the error chain.
+  private val MAX_ERROR_CHAIN_LENGTH = 5
+
+  /**
+   * Convert Throwable to a protobuf message FetchErrorDetailsResponse.
+   * @param st
+   *   the Throwable to be converted
+   * @param serverStackTraceEnabled
+   *   whether to return the server stack trace.
+   * @return
+   *   FetchErrorDetailsResponse
+   */
+  private[connect] def throwableToFetchErrorDetailsResponse(
+  st: Throwable,
+  serverStackTraceEnabled: Boolean = false): FetchErrorDetailsResponse = {
+val errors =
+  traverseCauses(st).take(MAX_ERROR_CHAIN_LENGTH).zipWithIndex.map { case 
(error, idx) =>
+val builder = FetchErrorDetailsResponse.Error
+  .newBuilder()
+  .setMessage(error.getMessage)
+  
.addAllErrorTypeHierarchy(ErrorUtils.allClasses(error.getClass).map(_.getName).asJava)
+
+if (error.getCause != null) {
+  builder.setCauseIdx(idx + 1)

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330626485


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -66,14 +137,26 @@ private[connect] object ErrorUtils extends Logging {
 "classes",
 
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName
 
-lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
-val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
-  val maxSize = 
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-  errorInfo.putMetadata("stackTrace", 
StringUtils.abbreviate(stackTrace.get, maxSize))
-} else {
-  errorInfo
+if (sessionHolder.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)) {
+  // Generate a new unique key for this exception.
+  val errorId = UUID.randomUUID().toString
+
+  errorInfo.putMetadata("errorId", errorId)
+
+  sessionHolder.errorIdToError
+.put(errorId, st)
 }
 
+lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))

Review Comment:
   I think it is trying to use some Short-circuit evaluation if I understand 
correctly... But I don't think it is necessary...



##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##
@@ -66,14 +137,26 @@ private[connect] object ErrorUtils extends Logging {
 "classes",
 
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName
 
-lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
-val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {
-  val maxSize = 
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-  errorInfo.putMetadata("stackTrace", 
StringUtils.abbreviate(stackTrace.get, maxSize))
-} else {
-  errorInfo
+if (sessionHolder.session.conf.get(Connect.CONNECT_ENRICH_ERROR_ENABLED)) {
+  // Generate a new unique key for this exception.
+  val errorId = UUID.randomUUID().toString
+
+  errorInfo.putMetadata("errorId", errorId)
+
+  sessionHolder.errorIdToError
+.put(errorId, st)
 }
 
+lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))

Review Comment:
   It is trying to use some Short-circuit evaluation if I understand 
correctly... But I don't think it is necessary...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330656751


##
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import java.util.UUID
+
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.ResourceHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.ThreadUtils
+
+private class FetchErrorDetailsResponseObserver(p: 
Promise[FetchErrorDetailsResponse])
+extends StreamObserver[FetchErrorDetailsResponse] {
+  override def onNext(v: FetchErrorDetailsResponse): Unit = p.success(v)
+  override def onError(throwable: Throwable): Unit = throw throwable
+  override def onCompleted(): Unit = {}
+}
+
+class FetchErrorDetailsHandlerSuite extends SharedSparkSession with 
ResourceHelper {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-19 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1330657004


##
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import java.util.UUID
+
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.ResourceHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.ThreadUtils
+
+private class FetchErrorDetailsResponseObserver(p: 
Promise[FetchErrorDetailsResponse])
+extends StreamObserver[FetchErrorDetailsResponse] {
+  override def onNext(v: FetchErrorDetailsResponse): Unit = p.success(v)
+  override def onError(throwable: Throwable): Unit = throw throwable
+  override def onCompleted(): Unit = {}
+}
+
+class FetchErrorDetailsHandlerSuite extends SharedSparkSession with 
ResourceHelper {
+
+  private val userId = "user1"
+
+  private val sessionId = UUID.randomUUID().toString
+
+  private def fetchErrorDetails(
+  userId: String,
+  sessionId: String,
+  errorId: String): FetchErrorDetailsResponse = {
+val promise = Promise[FetchErrorDetailsResponse]
+val handler =
+  new SparkConnectFetchErrorDetailsHandler(new 
FetchErrorDetailsResponseObserver(promise))
+val context = proto.UserContext
+  .newBuilder()
+  .setUserId(userId)
+  .build()
+val request = proto.FetchErrorDetailsRequest
+  .newBuilder()
+  .setUserContext(context)
+  .setSessionId(sessionId)
+  .setErrorId(errorId)
+  .build()
+handler.handle(request)
+ThreadUtils.awaitResult(promise.future, 5.seconds)
+  }
+
+  test("error chain is properly constructed") {
+val testError =
+  new Exception("test1", new Exception("test2"))
+val errorId = UUID.randomUUID().toString()
+
+SparkConnectService
+  .getOrCreateIsolatedSession(userId, sessionId)
+  .errorIdToError
+  .put(errorId, testError)
+
+val response = fetchErrorDetails(userId, sessionId, errorId)
+assert(response.hasRootErrorIdx)
+assert(response.getRootErrorIdx == 0)
+
+assert(response.getErrorsCount == 2)
+assert(response.getErrors(0).getMessage == "test1")
+assert(response.getErrors(0).getErrorTypeHierarchy(0) == 
classOf[Exception].getName)

Review Comment:
   Exception, Throwable, Object



##
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/FetchErrorDetailsHandlerSuite.scala:
##
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import java.util.UUID
+
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse
+import org.apache.spark.sql.connect.ResourceHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.ThreadUtils
+
+private class FetchErrorDetailsResponseObserver(p: 
Promise[FetchErrorDetailsResponse])
+extends StreamObserver[FetchErrorDetailsResponse] {
+  override def onNext(v: Fetc

[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement FetchErrorDetails RPC

2023-09-20 Thread via GitHub


heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1323060159


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala:
##
@@ -175,6 +175,36 @@ class ClientStreamingQuerySuite extends QueryTest with 
SQLHelper with Logging {
 }
   }
 
+  test("throw exception in streaming") {
+val session = spark
+import session.implicits._
+
+val checkForTwo = udf((value: Int) => {
+  if (value == 2) {
+throw new RuntimeException("Number 2 encountered!")
+  }
+  value
+})
+
+val query = spark.readStream
+  .format("rate")
+  .option("rowsPerSecond", "1")
+  .load()
+  .select(checkForTwo($"value").as("checkedValue"))
+  .writeStream
+  .outputMode("append")
+  .format("console")
+  .start()
+
+val exception = intercept[SparkException] {
+  query.awaitTermination()
+}
+
+assert(
+  exception.getCause.getCause.getCause.getMessage

Review Comment:
   1. org.apache.spark.SparkException: 
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to 
stage failure: Task 0 in stage 1384542.0 failed 4 times, most recent failure: 
Lost task 0.3 in stage ...
   2. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 1384542.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
1384542.0 ...
   3. org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute 
user defined function (`$Lambda$19844/1935914328`: (int) => int).
   4. org.apache.spark.SparkException: java.lang.RuntimeException: Number 2 
encountered!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org