MaxGekk commented on code in PR #38344:
URL: https://github.com/apache/spark/pull/38344#discussion_r1008034311


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): 
Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes 
package prefix."
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName, 
"message" -> message),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufMessageTypeError(protobufClassName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName))
+  }
+
+  def protobufDescriptorDependencyError(dependencyName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DEPENDENCY_ERROR",
+      messageParameters = Map("dependencyName" -> dependencyName))
+  }
+
+  def invalidByteStringFormatError(): Throwable = {
+    new AnalysisException(errorClass = "INVALID_BYTE_STRING_ERROR", 
messageParameters = Map.empty)
+  }
+
+  def malformedRecordsDetectedInRecordParsingError(cause: Throwable): 
Throwable = {

Review Comment:
   Is it a compilation error not runtime (execution) one?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): 
Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala:
##########
@@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
           duration.build()
 
       case _ =>
-        throw new IncompatibleSchemaException(
-          errorPrefix +
-            s"schema is incompatible (sqlType = ${catalystType.sql}, " +
-            s"protoType = ${fieldDescriptor.getJavaType})")
+        throw 
QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
+          toFieldStr(catalystPath),

Review Comment:
   Please, don't quote it twice if you do that inside of 
`cannotConvertCatalystTypeToProtobufTypeError()` by `toSQLId`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,

Review Comment:
   If it is a column id, wrap it by `toSQLId()`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): 
Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))
+  }
+
+  def noProtobufMessageTypeReturnError(descriptorName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "NO_PROTOBUF_MESSAGE_TYPE_ERROR",
+      messageParameters = Map("descriptorName" -> descriptorName))
+  }
+
+  def failedParsingDescriptorError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_PARSING_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def foundRecursionInProtobufSchema(fieldDescriptor: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_RECURSION_ERROR",
+      messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
+  }
+
+  def protobufFieldTypeMismatchError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_TYPE_MISMATCH",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufClassLoadError(
+      protobufClassName: String,
+      hasDots: Boolean,
+      cause: Throwable): Throwable = {
+    val message = if (hasDots) "" else ". Ensure the class name includes 
package prefix."
+    new AnalysisException(
+      errorClass = "PROTOBUF_CLASS_LOAD_ERROR",
+      messageParameters = Map("protobufClassName" -> protobufClassName, 
"message" -> message),
+      cause = Some(cause.getCause))

Review Comment:
   `getCause()` can return `null`, correct. If so, could you wrap it by 
`Option(cause.getCause)`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -3210,4 +3211,190 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
       messageParameters = Map("expression" -> toSQLExpr(expression))
     )
   }
+
+  def cannotConvertProtobufTypeToSqlTypeError(
+      protobufColumn: String,
+      sqlColumn: String,
+      protobufType: String,
+      sqlType: DataType): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_SQL_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufColumn" -> protobufColumn,
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufType" -> protobufType,
+        "sqlType" -> toSQLType(sqlType)))
+  }
+
+  def cannotConvertCatalystTypeToProtobufTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      sqlType: DataType,
+      protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "SQL_TYPE_TO_PROTOBUF_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> toSQLId(sqlColumn),
+        "protobufColumn" -> protobufColumn,
+        "sqlType" -> toSQLType(sqlType),
+        "protobufType" -> protobufType))
+  }
+
+  def cannotConvertCatalystTypeToProtobufEnumTypeError(
+      sqlColumn: String,
+      protobufColumn: String,
+      data: String,
+      enumString: String): Throwable = {
+    new AnalysisException(
+      errorClass = "CATALYST_TYPE_TO_PROTOBUF_ENUM_TYPE_ERROR",
+      messageParameters = Map(
+        "sqlColumn" -> sqlColumn,
+        "protobufColumn" -> protobufColumn,
+        "data" -> data,
+        "enumString" -> enumString))
+  }
+
+  def cannotConvertProtobufTypeToCatalystTypeError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotConvertSqlTypeToProtobufError(
+      protobufType: String,
+      sqlType: DataType,
+      cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE",
+      messageParameters = Map(
+        "protobufType" -> protobufType,
+        "toType" -> toSQLType(sqlType)),
+      cause = Some(cause.getCause))
+  }
+
+  def protobufTypeUnsupportedYetError(protobufType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_TYPE_NOT_SUPPORT_ERROR",
+      messageParameters = Map("protobufType" -> protobufType))
+  }
+
+  def unknownProtobufMessageTypeError(
+      descriptorName: String,
+      containingType: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNKNOWN_PROTOBUF_MESSAGE_TYPE",
+      messageParameters = Map(
+        "descriptorName" -> descriptorName,
+        "containingType" -> containingType))
+  }
+
+  def cannotFindCatalystTypeInProtobufSchemaError(catalystFieldPath: String): 
Throwable = {
+    new AnalysisException(
+      errorClass = "NO_CATALYST_TYPE_IN_PROTOBUF_SCHEMA",
+      messageParameters = Map("catalystFieldPath" -> catalystFieldPath))
+  }
+
+  def cannotFindProtobufFieldInCatalystError(field: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_IN_CATALYST_SCHEMA",
+      messageParameters = Map("field" -> field))
+  }
+
+  def protobufFieldMatchError(field: String,
+      protobufSchema: String,
+      matchSize: String,
+      matches: String): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_FIELD_MISSING_ERROR",
+      messageParameters = Map(
+        "field" -> field,
+        "protobufSchema" -> protobufSchema,
+        "matchSize" -> matchSize,
+        "matches" -> matches))
+  }
+
+  def unableToLocateProtobufMessageError(messageName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "UNABLE_TO_LOCATE_PROTOBUF_MESSAGE_ERROR",
+      messageParameters = Map("messageName" -> messageName))
+  }
+
+  def descrioptorParseError(cause: Throwable): Throwable = {
+    new AnalysisException(
+      errorClass = "PROTOBUF_DESCRIPTOR_ERROR",
+      messageParameters = Map.empty(),
+      cause = Some(cause.getCause))
+  }
+
+  def cannotFindDescriptorFileError(filePath: String, cause: Throwable): 
Throwable = {
+    new AnalysisException(
+      errorClass = "CANNOT_FIND_PROTOBUF_DESCRIPTOR_FILE_ERROR",
+      messageParameters = Map("filePath" -> filePath),
+      cause = Some(cause.getCause))

Review Comment:
   Some -> Option



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to