[ 
https://issues.apache.org/jira/browse/SPARK-22443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16239312#comment-16239312
 ] 

Hongbo commented on SPARK-22443:
--------------------------------

In our case, we want to map MySQL YEAR to ShortType instead of DateType. 
Ideally, I'd like to write a custom dialect by overriding the getCatalystType 
method. But it doesn't work because it breaks the quoteIdentifier implemented 
in the predefined MySQLDialect.

We have a workaround. The custom dialect needs to override all other methods 
and redirect to the implementation in MySQLDialect. Then we unregister 
MySQLDialect and register our custom dialect. It's not robust because if there 
are new methods added to JdbcDialect in the future, it may break again.


> AggregatedDialect doesn't override quoteIdentifier and other methods in 
> JdbcDialects
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-22443
>                 URL: https://issues.apache.org/jira/browse/SPARK-22443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Hongbo
>
> The AggregatedDialect only implements canHandle, getCatalystType, 
> getJDBCType. It doesn't implement other methods in JdbcDialect. 
> So if multiple Dialects are registered with the same driver, the 
> implementation of these methods will not be taken and the default 
> implementation in JdbcDialect will be used.
> Example:
> {code:java}
> package example
> import java.util.Properties
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
> import org.apache.spark.sql.types.{DataType, MetadataBuilder}
> object AnotherMySQLDialect extends JdbcDialect {
>   override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
>   override def getCatalystType(
>                                 sqlType: Int, typeName: String, size: Int, 
> md: MetadataBuilder): Option[DataType] = {
>     None
>   }
>   override def quoteIdentifier(colName: String): String = {
>     s"`$colName`"
>   }
> }
> object App {
>   def main(args: Array[String]) {
>     val spark = SparkSession.builder.master("local").appName("Simple 
> Application").getOrCreate()
>     JdbcDialects.registerDialect(AnotherMySQLDialect)
>     val jdbcUrl = s"jdbc:mysql://host:port/db?user=user&password=password"
>     spark.read.jdbc(jdbcUrl, "badge", new Properties()).show()
>   }
> }
> {code}
> will throw an exception. 
> {code:none}
> 17/11/03 17:08:39 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> java.sql.SQLDataException: Cannot determine value type from string 'id'
>       at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:530)
>       at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
>       at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
>       at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
>       at 
> com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
>       at 
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:89)
>       at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getLong(ResultSetImpl.java:853)
>       at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:409)
>       at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:408)
>       at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:330)
>       at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:312)
>       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:108)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: com.mysql.cj.core.exceptions.DataConversionException: Cannot 
> determine value type from string 'id'
>       at 
> com.mysql.cj.core.io.StringConverter.createFromBytes(StringConverter.java:121)
>       at 
> com.mysql.cj.core.io.MysqlTextValueDecoder.decodeByteArray(MysqlTextValueDecoder.java:232)
>       at 
> com.mysql.cj.mysqla.result.AbstractResultsetRow.decodeAndCreateReturnValue(AbstractResultsetRow.java:124)
>       at 
> com.mysql.cj.mysqla.result.AbstractResultsetRow.getValueFromBytes(AbstractResultsetRow.java:225)
>       at 
> com.mysql.cj.mysqla.result.ByteArrayRow.getValue(ByteArrayRow.java:84)
>       at 
> com.mysql.cj.jdbc.result.ResultSetImpl.getNonStringValueFromRow(ResultSetImpl.java:630)
>       ... 24 more
> {code}
> Though the quoteIdentifier is correctly implemented in Spark's MySQLDialect 
> and our AnotherMySQLDialect.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to