[ 
https://issues.apache.org/jira/browse/SPARK-30959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Kumachev updated SPARK-30959:
-------------------------------------
    Description: 
My initial goal is to save UUId values to SQL Server/Azure DWH to column of 
BINARY(16) type.

For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
 

I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
 val uuidBytes = Array.ofDim[Byte](16)
 ByteBuffer.wrap(uuidBytes)
    .order(ByteOrder.BIG_ENDIAN)
    .putLong(uuid.getMostSignificantBits())
    .putLong(uuid.getLeastSignificantBits()
val schema = StructType(
    List(
        StructField("EventId", BinaryType, false)
    )
 )
 val data = Seq((uuidBytes)).toDF("EventId").rdd;
 val df = spark.createDataFrame(data, schema);
df.write
    .format("jdbc")
    .option("url", "<DATABASE_CONNECTION_URL>")
    .option("dbTable", "Events")
    .mode(org.apache.spark.sql.SaveMode.Append)
    .save()
{code}
 

This code returns an error:

 
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type 
VARBINARY to target column type BINARY is not supported.{noformat}
 

 

My question is how to cope with this situation and insert UUId value to 
BINARY(16) column?

 

My investigation:

Spark uses conception of JdbcDialects and has a mapping for each Catalyst type 
to database type and vice versa. For example here is 
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
 which is used when we work against SQL Server or Azure DWH. In the method 
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)", 
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.

 

So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
    override def canHandle(url: String) : Boolean = 
url.startsWith("jdbc:sqlserver")

    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
        case _ => None
    }
 }
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}
With this modification I still catch exactly the same error. It looks like that 
Spark do not use mapping from my custom dialect. But I checked that the dialect 
is registered. So it is strange situation.

  was:
My initial goal is to save UUId values to SQL Server/Azure DWH to column of 
BINARY(16) type.

For example, I have demo table:
{code:java}
CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
 

I want to write data to it using Spark like this:
{code:java}
import java.util.UUID
val uuid = UUID.randomUUID()
 val uuidBytes = Array.ofDim[Byte](16)
 ByteBuffer.wrap(uuidBytes)
    .order(ByteOrder.BIG_ENDIAN)
    .putLong(uuid.getMostSignificantBits())
    .putLong(uuid.getLeastSignificantBits()
val schema = StructType(
    List(
        StructField("EventId", BinaryType, false)
    )
 )
 val data = Seq((uuidBytes)).toDF("EventId").rdd;
 val df = spark.createDataFrame(data, schema);
df.write
    .format("jdbc")
    .option("url", "<DATABASE_CONNECTION_URL>")
    .option("dbTable", "Events")
    .mode(org.apache.spark.sql.SaveMode.Append)
    .save()
{code}
 

This code returns an error:

 
{noformat}
java.sql.BatchUpdateException: Conversion from variable or parameter type 
VARBINARY to target column type BINARY is not supported.{noformat}
 

 

My question is how to cope with this situation and insert UUId value to 
BINARY(16) column?

 

My investigation:

Spark uses conception of JdbcDialects and has a mapping for each Catalyst type 
to database type and vice versa. For example here is 
[MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
 which is used when we work against SQL Server or Azure DWH. In the method 
`getJDBCType` you can see the mapping:
{code:java}
case BinaryType => Some(JdbcType("VARBINARY(MAX)", 
java.sql.Types.VARBINARY)){code}
and this is the root of my problem as I think.

 

So, I decide to implement my own JdbcDialect to override this behavior:
{code:java}
class SqlServerDialect extends JdbcDialect {
    override def canHandle(url: String) : Boolean = 
url.startsWith("jdbc:sqlserver")
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
        case BinaryType => Option(JdbcType("BINARY(16)", java.sql.Types.BINARY))
        case _ => None
    }
 }
val dialect = new SqlServerDialect
JdbcDialects.registerDialect(dialect)
{code}

With this modification I still catch exactly the same error. It looks like that 
Spark do not use mapping from my custom dialect. But I checked that the dialect 
is registered. So it is strange situation.


> How to write using JDBC driver to SQL Server / Azure DWH to column of BINARY 
> type?
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-30959
>                 URL: https://issues.apache.org/jira/browse/SPARK-30959
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 2.4.4
>            Reporter: Mikhail Kumachev
>            Priority: Minor
>
> My initial goal is to save UUId values to SQL Server/Azure DWH to column of 
> BINARY(16) type.
> For example, I have demo table:
> {code:java}
> CREATE TABLE [Events] ([EventId] [binary](16) NOT NULL){code}
>  
> I want to write data to it using Spark like this:
> {code:java}
> import java.util.UUID
> val uuid = UUID.randomUUID()
>  val uuidBytes = Array.ofDim[Byte](16)
>  ByteBuffer.wrap(uuidBytes)
>     .order(ByteOrder.BIG_ENDIAN)
>     .putLong(uuid.getMostSignificantBits())
>     .putLong(uuid.getLeastSignificantBits()
> val schema = StructType(
>     List(
>         StructField("EventId", BinaryType, false)
>     )
>  )
>  val data = Seq((uuidBytes)).toDF("EventId").rdd;
>  val df = spark.createDataFrame(data, schema);
> df.write
>     .format("jdbc")
>     .option("url", "<DATABASE_CONNECTION_URL>")
>     .option("dbTable", "Events")
>     .mode(org.apache.spark.sql.SaveMode.Append)
>     .save()
> {code}
>  
> This code returns an error:
>  
> {noformat}
> java.sql.BatchUpdateException: Conversion from variable or parameter type 
> VARBINARY to target column type BINARY is not supported.{noformat}
>  
>  
> My question is how to cope with this situation and insert UUId value to 
> BINARY(16) column?
>  
> My investigation:
> Spark uses conception of JdbcDialects and has a mapping for each Catalyst 
> type to database type and vice versa. For example here is 
> [MsSqlServerDialect|[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala]]
>  which is used when we work against SQL Server or Azure DWH. In the method 
> `getJDBCType` you can see the mapping:
> {code:java}
> case BinaryType => Some(JdbcType("VARBINARY(MAX)", 
> java.sql.Types.VARBINARY)){code}
> and this is the root of my problem as I think.
>  
> So, I decide to implement my own JdbcDialect to override this behavior:
> {code:java}
> class SqlServerDialect extends JdbcDialect {
>     override def canHandle(url: String) : Boolean = 
> url.startsWith("jdbc:sqlserver")
>     override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
>         case BinaryType => Option(JdbcType("BINARY(16)", 
> java.sql.Types.BINARY))
>         case _ => None
>     }
>  }
> val dialect = new SqlServerDialect
> JdbcDialects.registerDialect(dialect)
> {code}
> With this modification I still catch exactly the same error. It looks like 
> that Spark do not use mapping from my custom dialect. But I checked that the 
> dialect is registered. So it is strange situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to