[ https://issues.apache.org/jira/browse/SPARK-50656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955005#comment-17955005 ]
Jie Han edited comment on SPARK-50656 at 5/29/25 4:16 PM: ---------------------------------------------------------- I use cursor to generate such trino dialect implementation to parse Array and Map types: {code:java} package com.example.sparktest import org.apache.spark.sql.jdbc.JdbcDialect import org.apache.spark.sql.types.{ ArrayType, DataType, MapType, MetadataBuilder, StringType, IntegerType, LongType, DoubleType, BooleanType, TimestampType, DateType, DecimalType } import java.util.Locale import scala.util.matching.Regex class TrinoJdbcDialect extends JdbcDialect { override def canHandle(url: String): Boolean = { url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino") } override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder ): Option[DataType] = { if (typeName.startsWith("array(")) { parseArrayType(typeName) } else if (typeName.startsWith("map(")) { parseMapType(typeName) } else { None } } /** Parse Trino array types * Examples: array(varchar), array(integer), array(array(varchar)) */ private def parseArrayType(typeName: String): Option[DataType] = { val arrayPattern: Regex = """array\s*\(\s*(.+)\s*\)""".r typeName match { case arrayPattern(elementTypeName) => parseTrinoType(elementTypeName.trim) match { case Some(elementType) => Some(ArrayType(elementType, containsNull = true)) case None => None } case _ => None } } /** Parse Trino map types * Examples: map(varchar, integer), map(varchar, array(varchar)) */ private def parseMapType(typeName: String): Option[DataType] = { val mapPattern: Regex = """map\s*\(\s*(.+?)\s*,\s*(.+)\s*\)""".r typeName match { case mapPattern(keyTypeName, valueTypeName) => val keyTypeOpt = parseTrinoType(keyTypeName.trim) val valueTypeOpt = parseTrinoType(valueTypeName.trim) (keyTypeOpt, valueTypeOpt) match { case (Some(keyType), Some(valueType)) => Some(MapType(keyType, valueType, valueContainsNull = true)) case _ => None } case _ => None } } /** Parse Trino basic types and complex types */ private def parseTrinoType(typeName: String): Option[DataType] = { val cleanTypeName = typeName.toLowerCase.trim // Handle nested array types if (cleanTypeName.startsWith("array(")) { return parseArrayType(cleanTypeName) } // Handle nested map types if (cleanTypeName.startsWith("map(")) { return parseMapType(cleanTypeName) } // Handle basic types cleanTypeName match { // String types case t if t.startsWith("varchar") || t == "varchar" => Some(StringType) case t if t.startsWith("char") => Some(StringType) case "text" => Some(StringType) // Numeric types case "boolean" => Some(BooleanType) case "tinyint" => Some(org.apache.spark.sql.types.ByteType) case "smallint" => Some(org.apache.spark.sql.types.ShortType) case "integer" | "int" => Some(IntegerType) case "bigint" => Some(LongType) case "real" | "float" => Some(org.apache.spark.sql.types.FloatType) case "double" => Some(DoubleType) // Temporal types case "date" => Some(DateType) case t if t.startsWith("timestamp") => Some(TimestampType) case "time" => Some(StringType) // Spark doesn't have Time type, use String instead // Decimal types case t if t.startsWith("decimal") => parseDecimalType(t) // Binary types case "varbinary" => Some(org.apache.spark.sql.types.BinaryType) case "binary" => Some(org.apache.spark.sql.types.BinaryType) // Other types mapped to string case "json" => Some(StringType) case "uuid" => Some(StringType) case "ipaddress" => Some(StringType) case _ => None // Return None for unknown types } } /** Parse decimal types, extracting precision and scale * Example: decimal(10,2) */ private def parseDecimalType(typeName: String): Option[DataType] = { val decimalPattern: Regex = """decimal\s*\(\s*(\d+)\s*(?:,\s*(\d+))?\s*\)""".r typeName match { case decimalPattern(precision, scale) => val precisionInt = precision.toInt val scaleInt = Option(scale).map(_.toInt).getOrElse(0) Some(DecimalType(precisionInt, scaleInt)) case _ if typeName == "decimal" => Some(DecimalType(10, 0)) // Default precision case _ => None } } /** Smart split considering nested parentheses and commas * Used for parsing complex nested types */ private def smartSplit(str: String, delimiter: Char): List[String] = { val result = scala.collection.mutable.ListBuffer[String]() var current = new StringBuilder() var depth = 0 var inQuotes = false for (char <- str) { char match { case '\'' | '"' if !inQuotes => inQuotes = true case '\'' | '"' if inQuotes => inQuotes = false case '(' | '<' if !inQuotes => depth += 1 case ')' | '>' if !inQuotes => depth -= 1 case c if c == delimiter && depth == 0 && !inQuotes => result += current.toString.trim current.clear() case _ => // Continue adding characters } if (char != delimiter || depth > 0 || inQuotes) { current += char } } if (current.nonEmpty) { result += current.toString.trim } result.toList } } {code} And then make it accessible to spark: {code:java} JdbcDialects.registerDialect(new TrinoJdbcDialect){code} But I haven't try it in the distributed mode. [~narayanbhawar] was (Author: JIRAUSER285788): I use cursor to generate such trino dialect implementation to parse Array and Map types: {code:java} package com.example.sparktest import org.apache.spark.sql.jdbc.JdbcDialect import org.apache.spark.sql.types.{ ArrayType, DataType, MapType, MetadataBuilder, StringType, IntegerType, LongType, DoubleType, BooleanType, TimestampType, DateType, DecimalType } import java.util.Locale import scala.util.matching.Regex class TrinoJdbcDialect extends JdbcDialect { override def canHandle(url: String): Boolean = { url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino") } override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder ): Option[DataType] = { if (typeName.startsWith("array(")) { parseArrayType(typeName) } else if (typeName.startsWith("map(")) { parseMapType(typeName) } else { None } } /** Parse Trino array types * Examples: array(varchar), array(integer), array(array(varchar)) */ private def parseArrayType(typeName: String): Option[DataType] = { val arrayPattern: Regex = """array\s*\(\s*(.+)\s*\)""".r typeName match { case arrayPattern(elementTypeName) => parseTrinoType(elementTypeName.trim) match { case Some(elementType) => Some(ArrayType(elementType, containsNull = true)) case None => None } case _ => None } } /** Parse Trino map types * Examples: map(varchar, integer), map(varchar, array(varchar)) */ private def parseMapType(typeName: String): Option[DataType] = { val mapPattern: Regex = """map\s*\(\s*(.+?)\s*,\s*(.+)\s*\)""".r typeName match { case mapPattern(keyTypeName, valueTypeName) => val keyTypeOpt = parseTrinoType(keyTypeName.trim) val valueTypeOpt = parseTrinoType(valueTypeName.trim) (keyTypeOpt, valueTypeOpt) match { case (Some(keyType), Some(valueType)) => Some(MapType(keyType, valueType, valueContainsNull = true)) case _ => None } case _ => None } } /** Parse Trino basic types and complex types */ private def parseTrinoType(typeName: String): Option[DataType] = { val cleanTypeName = typeName.toLowerCase.trim // Handle nested array types if (cleanTypeName.startsWith("array(")) { return parseArrayType(cleanTypeName) } // Handle nested map types if (cleanTypeName.startsWith("map(")) { return parseMapType(cleanTypeName) } // Handle basic types cleanTypeName match { // String types case t if t.startsWith("varchar") || t == "varchar" => Some(StringType) case t if t.startsWith("char") => Some(StringType) case "text" => Some(StringType) // Numeric types case "boolean" => Some(BooleanType) case "tinyint" => Some(org.apache.spark.sql.types.ByteType) case "smallint" => Some(org.apache.spark.sql.types.ShortType) case "integer" | "int" => Some(IntegerType) case "bigint" => Some(LongType) case "real" | "float" => Some(org.apache.spark.sql.types.FloatType) case "double" => Some(DoubleType) // Temporal types case "date" => Some(DateType) case t if t.startsWith("timestamp") => Some(TimestampType) case "time" => Some(StringType) // Spark doesn't have Time type, use String instead // Decimal types case t if t.startsWith("decimal") => parseDecimalType(t) // Binary types case "varbinary" => Some(org.apache.spark.sql.types.BinaryType) case "binary" => Some(org.apache.spark.sql.types.BinaryType) // Other types mapped to string case "json" => Some(StringType) case "uuid" => Some(StringType) case "ipaddress" => Some(StringType) case _ => None // Return None for unknown types } } /** Parse decimal types, extracting precision and scale * Example: decimal(10,2) */ private def parseDecimalType(typeName: String): Option[DataType] = { val decimalPattern: Regex = """decimal\s*\(\s*(\d+)\s*(?:,\s*(\d+))?\s*\)""".r typeName match { case decimalPattern(precision, scale) => val precisionInt = precision.toInt val scaleInt = Option(scale).map(_.toInt).getOrElse(0) Some(DecimalType(precisionInt, scaleInt)) case _ if typeName == "decimal" => Some(DecimalType(10, 0)) // Default precision case _ => None } } /** Smart split considering nested parentheses and commas * Used for parsing complex nested types */ private def smartSplit(str: String, delimiter: Char): List[String] = { val result = scala.collection.mutable.ListBuffer[String]() var current = new StringBuilder() var depth = 0 var inQuotes = false for (char <- str) { char match { case '\'' | '"' if !inQuotes => inQuotes = true case '\'' | '"' if inQuotes => inQuotes = false case '(' | '<' if !inQuotes => depth += 1 case ')' | '>' if !inQuotes => depth -= 1 case c if c == delimiter && depth == 0 && !inQuotes => result += current.toString.trim current.clear() case _ => // Continue adding characters } if (char != delimiter || depth > 0 || inQuotes) { current += char } } if (current.nonEmpty) { result += current.toString.trim } result.toList } } {code} And then make it accessible to spark: {code:java} JdbcDialects.registerDialect(new TrinoJdbcDialect){code} But I haven't try it in the distributed mode. > JDBC Reader Fails to Handle Complex Types (Array, Map) from Trino > ----------------------------------------------------------------- > > Key: SPARK-50656 > URL: https://issues.apache.org/jira/browse/SPARK-50656 > Project: Spark > Issue Type: Bug > Components: Spark Core, Spark Shell > Affects Versions: 3.5.4 > Environment: {*}Environment{*}: > * Spark version: 3.5.x > * Trino version: 457 > * JDBC Driver: {{io.trino.jdbc.TrinoDriver}} > Reporter: Narayan Bhawar > Priority: Major > Labels: Trino, complextype, jdbc, spark > > {*}Description{*}: > I am encountering an issue when using Spark to read data from a Trino > instance via JDBC. Specifically, when querying complex types such as > {{ARRAY}} or {{MAP}} from Trino, Spark throws an error indicating that it > cannot recognize these SQL types. Below is the context: > {*}Code Example{*}: > > {code:java} > val sourceDF = spark.read > .format("jdbc") > .option("driver", "io.trino.jdbc.TrinoDriver") > .option("url", "jdbc:trino://localhost:8181") > .option("query", "select address from minio.qa.nbcheck1") > .load(){code} > > > *Error Message:* > > {code:java} > 2/04 03:49:59 INFO SparkContext: SparkContext already stopped. > Exception in thread "main" org.apache.spark.SparkSQLException: > [UNRECOGNIZED_SQL_TYPE] Unrecognized SQL type - name: array (row(city > varchar, state varchar)), id: ARRAY. > at > org.apache.spark.sql.errors.QueryExecutionErrors$.unrecognizedSqlTypeError(QueryExecutionErrors.scala:992){code} > > > {*}Root Cause{*}: > The error seems to be occurring because Spark's JDBC data source does not > recognize complex SQL types like {{ARRAY}} or {{MAP}} from Trino by default. > This is confirmed by the following relevant section of Spark's code: > [https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala] > {code:java} > private def getCatalystType( > sqlType: Int, > typeName: String, > precision: Int, > scale: Int, > signed: Boolean, > isTimestampNTZ: Boolean): DataType = sqlType match { > ... > case _ => > // For unmatched types: > // including java.sql.Types.ARRAY, DATALINK, DISTINCT, JAVA_OBJECT, > NULL, OTHER, REF_CURSOR, > // TIME_WITH_TIMEZONE, TIMESTAMP_WITH_TIMEZONE, and others. > val jdbcType = classOf[JDBCType].getEnumConstants() > .find(_.getVendorTypeNumber == sqlType) > .map(_.getName) > .getOrElse(sqlType.toString) > throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, > typeName){code} > As you can see, the method for translating JDBC types to Spark Catalyst types > doesn't currently handle ARRAY or MAP, among other types, leading to the > error. The JDBC schema translation fails when complex types such as ARRAY or > MAP are present. > {*}Expected Behavior{*}: > Spark should not fail when encountering complex types like {{ARRAY}} or > {{MAP}} from a Trino JDBC source. Instead, it should either: > # Convert these complex types into a serialized string format (e.g., JSON) > over the wire. > # Provide an option for users to manually handle such complex types after > loading them into a DataFrame. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org