heyihong commented on code in PR #42377: URL: https://github.com/apache/spark/pull/42377#discussion_r1313613318
########## connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala: ########## @@ -19,51 +19,184 @@ package org.apache.spark.sql.connect.client import java.time.DateTimeException import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.ClassTag import com.google.rpc.ErrorInfo import io.grpc.StatusRuntimeException import io.grpc.protobuf.StatusProto +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext} +import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.util.JsonUtils -private[client] object GrpcExceptionConverter extends JsonUtils { - def convert[T](f: => T): T = { +/** + * GrpcExceptionConverter converts StatusRuntimeException to Spark exceptions. + * @param grpcStub + * grpcStub for fetching error details from server. + */ +private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) { + import GrpcExceptionConverter._ + + def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = { try { f } catch { case e: StatusRuntimeException => - throw toThrowable(e) + throw toThrowable(e, sessionId, userContext) } } - def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = { + def convertIterator[T]( + sessionId: String, + userContext: UserContext, + iter: CloseableIterator[T]): CloseableIterator[T] = { new CloseableIterator[T] { override def hasNext: Boolean = { - convert { + convert(sessionId, userContext) { iter.hasNext } } override def next(): T = { - convert { + convert(sessionId, userContext) { iter.next() } } override def close(): Unit = { - convert { + convert(sessionId, userContext) { iter.close() } } } } + private def exceptionInfosToErrors( + errorId: String, + exceptionInfos: mutable.Map[String, FetchErrorDetailsResponse.ExceptionInfo] + ): List[Error] = { Review Comment: https://github.com/apache/spark/pull/42377#discussion_r1313613119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org