[ 
https://issues.apache.org/jira/browse/SPARK-50656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955005#comment-17955005
 ] 

Jie Han edited comment on SPARK-50656 at 5/29/25 4:16 PM:
----------------------------------------------------------

I use cursor to generate such trino dialect implementation to parse Array and 
Map types:
{code:java}
package com.example.sparktest

import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types.{
  ArrayType,
  DataType,
  MapType,
  MetadataBuilder,
  StringType,
  IntegerType,
  LongType,
  DoubleType,
  BooleanType,
  TimestampType,
  DateType,
  DecimalType
}

import java.util.Locale
import scala.util.matching.Regex

class TrinoJdbcDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = {
    url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino")
  }

  override def getCatalystType(
      sqlType: Int,
      typeName: String,
      size: Int,
      md: MetadataBuilder
  ): Option[DataType] = {
    if (typeName.startsWith("array(")) {
      parseArrayType(typeName)
    } else if (typeName.startsWith("map(")) {
      parseMapType(typeName)
    } else {
      None
    }
  }

  /** Parse Trino array types
    * Examples: array(varchar), array(integer), array(array(varchar))
    */
  private def parseArrayType(typeName: String): Option[DataType] = {
    val arrayPattern: Regex = """array\s*\(\s*(.+)\s*\)""".r

    typeName match {
      case arrayPattern(elementTypeName) =>
        parseTrinoType(elementTypeName.trim) match {
          case Some(elementType) =>
            Some(ArrayType(elementType, containsNull = true))
          case None => None
        }
      case _ => None
    }
  }

  /** Parse Trino map types
    * Examples: map(varchar, integer), map(varchar, array(varchar))
    */
  private def parseMapType(typeName: String): Option[DataType] = {
    val mapPattern: Regex = """map\s*\(\s*(.+?)\s*,\s*(.+)\s*\)""".r

    typeName match {
      case mapPattern(keyTypeName, valueTypeName) =>
        val keyTypeOpt = parseTrinoType(keyTypeName.trim)
        val valueTypeOpt = parseTrinoType(valueTypeName.trim)

        (keyTypeOpt, valueTypeOpt) match {
          case (Some(keyType), Some(valueType)) =>
            Some(MapType(keyType, valueType, valueContainsNull = true))
          case _ => None
        }
      case _ => None
    }
  }

  /** Parse Trino basic types and complex types
    */
  private def parseTrinoType(typeName: String): Option[DataType] = {
    val cleanTypeName = typeName.toLowerCase.trim

    // Handle nested array types
    if (cleanTypeName.startsWith("array(")) {
      return parseArrayType(cleanTypeName)
    }

    // Handle nested map types
    if (cleanTypeName.startsWith("map(")) {
      return parseMapType(cleanTypeName)
    }

    // Handle basic types
    cleanTypeName match {
      // String types
      case t if t.startsWith("varchar") || t == "varchar" => Some(StringType)
      case t if t.startsWith("char")                      => Some(StringType)
      case "text"                                         => Some(StringType)

      // Numeric types
      case "boolean"         => Some(BooleanType)
      case "tinyint"         => Some(org.apache.spark.sql.types.ByteType)
      case "smallint"        => Some(org.apache.spark.sql.types.ShortType)
      case "integer" | "int" => Some(IntegerType)
      case "bigint"          => Some(LongType)
      case "real" | "float"  => Some(org.apache.spark.sql.types.FloatType)
      case "double"          => Some(DoubleType)

      // Temporal types
      case "date"                         => Some(DateType)
      case t if t.startsWith("timestamp") => Some(TimestampType)
      case "time" =>
        Some(StringType) // Spark doesn't have Time type, use String instead

      // Decimal types
      case t if t.startsWith("decimal") =>
        parseDecimalType(t)

      // Binary types
      case "varbinary" => Some(org.apache.spark.sql.types.BinaryType)
      case "binary"    => Some(org.apache.spark.sql.types.BinaryType)

      // Other types mapped to string
      case "json"      => Some(StringType)
      case "uuid"      => Some(StringType)
      case "ipaddress" => Some(StringType)

      case _ => None // Return None for unknown types
    }
  }

  /** Parse decimal types, extracting precision and scale
    * Example: decimal(10,2)
    */
  private def parseDecimalType(typeName: String): Option[DataType] = {
    val decimalPattern: Regex =
      """decimal\s*\(\s*(\d+)\s*(?:,\s*(\d+))?\s*\)""".r

    typeName match {
      case decimalPattern(precision, scale) =>
        val precisionInt = precision.toInt
        val scaleInt = Option(scale).map(_.toInt).getOrElse(0)
        Some(DecimalType(precisionInt, scaleInt))
      case _ if typeName == "decimal" =>
        Some(DecimalType(10, 0)) // Default precision
      case _ => None
    }
  }

  /** Smart split considering nested parentheses and commas
    * Used for parsing complex nested types
    */
  private def smartSplit(str: String, delimiter: Char): List[String] = {
    val result = scala.collection.mutable.ListBuffer[String]()
    var current = new StringBuilder()
    var depth = 0
    var inQuotes = false

    for (char <- str) {
      char match {
        case '\'' | '"' if !inQuotes => inQuotes = true
        case '\'' | '"' if inQuotes  => inQuotes = false
        case '(' | '<' if !inQuotes  => depth += 1
        case ')' | '>' if !inQuotes  => depth -= 1
        case c if c == delimiter && depth == 0 && !inQuotes =>
          result += current.toString.trim
          current.clear()
        case _ => // Continue adding characters
      }
      if (char != delimiter || depth > 0 || inQuotes) {
        current += char
      }
    }

    if (current.nonEmpty) {
      result += current.toString.trim
    }

    result.toList
  }
}
 {code}
And then make it accessible to spark:
{code:java}
JdbcDialects.registerDialect(new TrinoJdbcDialect){code}
But I haven't try it in the distributed mode.

[~narayanbhawar] 


was (Author: JIRAUSER285788):
I use cursor to generate such trino dialect implementation to parse Array and 
Map types:
{code:java}
package com.example.sparktest

import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types.{
  ArrayType,
  DataType,
  MapType,
  MetadataBuilder,
  StringType,
  IntegerType,
  LongType,
  DoubleType,
  BooleanType,
  TimestampType,
  DateType,
  DecimalType
}

import java.util.Locale
import scala.util.matching.Regex

class TrinoJdbcDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = {
    url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino")
  }

  override def getCatalystType(
      sqlType: Int,
      typeName: String,
      size: Int,
      md: MetadataBuilder
  ): Option[DataType] = {
    if (typeName.startsWith("array(")) {
      parseArrayType(typeName)
    } else if (typeName.startsWith("map(")) {
      parseMapType(typeName)
    } else {
      None
    }
  }

  /** Parse Trino array types
    * Examples: array(varchar), array(integer), array(array(varchar))
    */
  private def parseArrayType(typeName: String): Option[DataType] = {
    val arrayPattern: Regex = """array\s*\(\s*(.+)\s*\)""".r

    typeName match {
      case arrayPattern(elementTypeName) =>
        parseTrinoType(elementTypeName.trim) match {
          case Some(elementType) =>
            Some(ArrayType(elementType, containsNull = true))
          case None => None
        }
      case _ => None
    }
  }

  /** Parse Trino map types
    * Examples: map(varchar, integer), map(varchar, array(varchar))
    */
  private def parseMapType(typeName: String): Option[DataType] = {
    val mapPattern: Regex = """map\s*\(\s*(.+?)\s*,\s*(.+)\s*\)""".r

    typeName match {
      case mapPattern(keyTypeName, valueTypeName) =>
        val keyTypeOpt = parseTrinoType(keyTypeName.trim)
        val valueTypeOpt = parseTrinoType(valueTypeName.trim)

        (keyTypeOpt, valueTypeOpt) match {
          case (Some(keyType), Some(valueType)) =>
            Some(MapType(keyType, valueType, valueContainsNull = true))
          case _ => None
        }
      case _ => None
    }
  }

  /** Parse Trino basic types and complex types
    */
  private def parseTrinoType(typeName: String): Option[DataType] = {
    val cleanTypeName = typeName.toLowerCase.trim

    // Handle nested array types
    if (cleanTypeName.startsWith("array(")) {
      return parseArrayType(cleanTypeName)
    }

    // Handle nested map types
    if (cleanTypeName.startsWith("map(")) {
      return parseMapType(cleanTypeName)
    }

    // Handle basic types
    cleanTypeName match {
      // String types
      case t if t.startsWith("varchar") || t == "varchar" => Some(StringType)
      case t if t.startsWith("char")                      => Some(StringType)
      case "text"                                         => Some(StringType)

      // Numeric types
      case "boolean"         => Some(BooleanType)
      case "tinyint"         => Some(org.apache.spark.sql.types.ByteType)
      case "smallint"        => Some(org.apache.spark.sql.types.ShortType)
      case "integer" | "int" => Some(IntegerType)
      case "bigint"          => Some(LongType)
      case "real" | "float"  => Some(org.apache.spark.sql.types.FloatType)
      case "double"          => Some(DoubleType)

      // Temporal types
      case "date"                         => Some(DateType)
      case t if t.startsWith("timestamp") => Some(TimestampType)
      case "time" =>
        Some(StringType) // Spark doesn't have Time type, use String instead

      // Decimal types
      case t if t.startsWith("decimal") =>
        parseDecimalType(t)

      // Binary types
      case "varbinary" => Some(org.apache.spark.sql.types.BinaryType)
      case "binary"    => Some(org.apache.spark.sql.types.BinaryType)

      // Other types mapped to string
      case "json"      => Some(StringType)
      case "uuid"      => Some(StringType)
      case "ipaddress" => Some(StringType)

      case _ => None // Return None for unknown types
    }
  }

  /** Parse decimal types, extracting precision and scale
    * Example: decimal(10,2)
    */
  private def parseDecimalType(typeName: String): Option[DataType] = {
    val decimalPattern: Regex =
      """decimal\s*\(\s*(\d+)\s*(?:,\s*(\d+))?\s*\)""".r

    typeName match {
      case decimalPattern(precision, scale) =>
        val precisionInt = precision.toInt
        val scaleInt = Option(scale).map(_.toInt).getOrElse(0)
        Some(DecimalType(precisionInt, scaleInt))
      case _ if typeName == "decimal" =>
        Some(DecimalType(10, 0)) // Default precision
      case _ => None
    }
  }

  /** Smart split considering nested parentheses and commas
    * Used for parsing complex nested types
    */
  private def smartSplit(str: String, delimiter: Char): List[String] = {
    val result = scala.collection.mutable.ListBuffer[String]()
    var current = new StringBuilder()
    var depth = 0
    var inQuotes = false

    for (char <- str) {
      char match {
        case '\'' | '"' if !inQuotes => inQuotes = true
        case '\'' | '"' if inQuotes  => inQuotes = false
        case '(' | '<' if !inQuotes  => depth += 1
        case ')' | '>' if !inQuotes  => depth -= 1
        case c if c == delimiter && depth == 0 && !inQuotes =>
          result += current.toString.trim
          current.clear()
        case _ => // Continue adding characters
      }
      if (char != delimiter || depth > 0 || inQuotes) {
        current += char
      }
    }

    if (current.nonEmpty) {
      result += current.toString.trim
    }

    result.toList
  }
}
 {code}
And then make it accessible to spark:
{code:java}
JdbcDialects.registerDialect(new TrinoJdbcDialect){code}
But I haven't try it in the distributed mode.

> JDBC Reader Fails to Handle Complex Types (Array, Map) from Trino
> -----------------------------------------------------------------
>
>                 Key: SPARK-50656
>                 URL: https://issues.apache.org/jira/browse/SPARK-50656
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, Spark Shell
>    Affects Versions: 3.5.4
>         Environment: {*}Environment{*}:
>  * Spark version: 3.5.x 
>  * Trino version: 457
>  * JDBC Driver: {{io.trino.jdbc.TrinoDriver}}
>            Reporter: Narayan Bhawar
>            Priority: Major
>              Labels: Trino, complextype, jdbc, spark
>
> {*}Description{*}:
> I am encountering an issue when using Spark to read data from a Trino 
> instance via JDBC. Specifically, when querying complex types such as 
> {{ARRAY}} or {{MAP}} from Trino, Spark throws an error indicating that it 
> cannot recognize these SQL types. Below is the context:
> {*}Code Example{*}:
>  
> {code:java}
> val sourceDF = spark.read
>   .format("jdbc")
>   .option("driver", "io.trino.jdbc.TrinoDriver")
>   .option("url", "jdbc:trino://localhost:8181")
>   .option("query", "select address from minio.qa.nbcheck1")
>   .load(){code}
>  
>  
> *Error Message:*
>  
> {code:java}
> 2/04 03:49:59 INFO SparkContext: SparkContext already stopped.
> Exception in thread "main" org.apache.spark.SparkSQLException: 
> [UNRECOGNIZED_SQL_TYPE] Unrecognized SQL type - name: array (row(city 
> varchar, state varchar)), id: ARRAY.
> at 
> org.apache.spark.sql.errors.QueryExecutionErrors$.unrecognizedSqlTypeError(QueryExecutionErrors.scala:992){code}
>  
>  
> {*}Root Cause{*}:
> The error seems to be occurring because Spark's JDBC data source does not 
> recognize complex SQL types like {{ARRAY}} or {{MAP}} from Trino by default. 
> This is confirmed by the following relevant section of Spark's code:
> [https://github.com/apache/spark/blob/v3.5.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala]
> {code:java}
> private def getCatalystType(
>   sqlType: Int,
>   typeName: String,
>   precision: Int,
>   scale: Int,
>   signed: Boolean,
>   isTimestampNTZ: Boolean): DataType = sqlType match {
>     ...
>     case _ =>
>       // For unmatched types:
>       // including java.sql.Types.ARRAY, DATALINK, DISTINCT, JAVA_OBJECT, 
> NULL, OTHER, REF_CURSOR,
>       // TIME_WITH_TIMEZONE, TIMESTAMP_WITH_TIMEZONE, and others.
>       val jdbcType = classOf[JDBCType].getEnumConstants()
>         .find(_.getVendorTypeNumber == sqlType)
>         .map(_.getName)
>         .getOrElse(sqlType.toString)
>       throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, 
> typeName){code}
> As you can see, the method for translating JDBC types to Spark Catalyst types 
> doesn't currently handle ARRAY or MAP, among other types, leading to the 
> error. The JDBC schema translation fails when complex types such as ARRAY or 
> MAP are present.
> {*}Expected Behavior{*}:
> Spark should not fail when encountering complex types like {{ARRAY}} or 
> {{MAP}} from a Trino JDBC source. Instead, it should either:
>  # Convert these complex types into a serialized string format (e.g., JSON) 
> over the wire.
>  # Provide an option for users to manually handle such complex types after 
> loading them into a DataFrame.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to