Hi Brandon,

It's relatively straightforward to try out different type options for this in 
the spark-shell, try pasting the attached code into spark-shell before you make 
a normal postgres JDBC connection.  

You can then experiment with the mappings without recompiling Spark or anything 
like that, and you can embed the same code in your own packages, outside of the 
main Spark releases.

Thanks,
Ewan

-----Original Message-----
From: BrandonBradley [mailto:bradleytas...@gmail.com] 
Sent: 22 January 2016 14:29
To: dev@spark.apache.org
Subject: Re: Spark 1.6.1

I'd like more complete Postgres JDBC support for ArrayType before the next 
release. Some of them are still broken in 1.6.0. It would save me much time.

Please see SPARK-12747 @ https://issues.apache.org/jira/browse/SPARK-12747

Cheers!
Brandon Bradley



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-1-tp16009p16082.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional 
commands, e-mail: dev-h...@spark.apache.org

import java.sql.{Connection, Types}

import org.apache.spark.sql.types._
import org.apache.spark.sql.jdbc
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.jdbc.JdbcType

def toCatalystType(typeName: String): Option[DataType] = typeName match {
    case "bool" => Some(BooleanType)
    case "bit" => Some(BinaryType)
    case "int2" => Some(ShortType)
    case "int4" => Some(IntegerType)
    case "int8" | "oid" => Some(LongType)
    case "float4" => Some(FloatType)
    case "money" | "float8" => Some(DoubleType)
    case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | 
"uuid" =>
      Some(StringType)
    case "bytea" => Some(BinaryType)
    case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType)
    case "date" => Some(DateType)
    case "numeric" => Some(DecimalType.SYSTEM_DEFAULT)
    case _ => None
}

case object PostgresDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = 
url.startsWith("jdbc:postgresql")

  override def getCatalystType(
      sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
    if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
      Some(BinaryType)
    } else if (sqlType == Types.OTHER) {
      toCatalystType(typeName).filter(_ == StringType)
    } else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0) == 
'_') {
      toCatalystType(typeName.drop(1)).map(ArrayType(_))
    } else None
  }
}

JdbcDialects.registerDialect(PostgresDialect)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to