heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1330211093
########## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala: ########## @@ -26,47 +26,131 @@ import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext} +import org.apache.spark.connect.proto.FetchErrorDetailsResponse.ExceptionInfo +import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.util.JsonUtils -private[client] object GrpcExceptionConverter extends JsonUtils { - def convert[T](f: => T): T = { +/** + * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions. + * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional + * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side + * stacktrace. + * + * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the + * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If + * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException + * itself. + */ +private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) { + import GrpcExceptionConverter._ + + def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = { try { f } catch { case e: StatusRuntimeException => - throw toThrowable(e) + throw toThrowable(e, sessionId, userContext) } } - def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = { + def convertIterator[T]( + sessionId: String, + userContext: UserContext, + iter: CloseableIterator[T]): CloseableIterator[T] = { new WrappedCloseableIterator[T] { override def innerIterator: Iterator[T] = iter override def hasNext: Boolean = { - convert { + convert(sessionId, userContext) { iter.hasNext } } override def next(): T = { - convert { + convert(sessionId, userContext) { iter.next() } } override def close(): Unit = { - convert { + convert(sessionId, userContext) { iter.close() } } } } + /** + * fetchEnrichedError fetches enriched errors with full exception message and optionally + * stacktrace by issuing an additional RPC call to fetch error details. The RPC call is + * best-effort at-most-once. + */ + private def fetchEnrichedError( + info: ErrorInfo, + sessionId: String, + userContext: UserContext): Option[Throwable] = { + val errorId = info.getMetadataOrDefault("errorId", null) + if (errorId == null) { + return None Review Comment: Let's defer this discussion to the client-side change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org