Hi Brandon,
It's relatively straightforward to try out different type options for this in
the spark-shell, try pasting the attached code into spark-shell before you make
a normal postgres JDBC connection.
You can then experiment with the mappings without recompiling Spark or anything
like that, and you can embed the same code in your own packages, outside of the
main Spark releases.
Thanks,
Ewan
-----Original Message-----
From: BrandonBradley [mailto:[email protected]]
Sent: 22 January 2016 14:29
To: [email protected]
Subject: Re: Spark 1.6.1
I'd like more complete Postgres JDBC support for ArrayType before the next
release. Some of them are still broken in 1.6.0. It would save me much time.
Please see SPARK-12747 @ https://issues.apache.org/jira/browse/SPARK-12747
Cheers!
Brandon Bradley
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-1-tp16009p16082.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected] For additional
commands, e-mail: [email protected]
import java.sql.{Connection, Types}
import org.apache.spark.sql.types._
import org.apache.spark.sql.jdbc
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.jdbc.JdbcType
def toCatalystType(typeName: String): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" |
"uuid" =>
Some(StringType)
case "bytea" => Some(BinaryType)
case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType)
case "date" => Some(DateType)
case "numeric" => Some(DecimalType.SYSTEM_DEFAULT)
case _ => None
}
case object PostgresDialect extends JdbcDialect {
override def canHandle(url: String): Boolean =
url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = {
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
Some(BinaryType)
} else if (sqlType == Types.OTHER) {
toCatalystType(typeName).filter(_ == StringType)
} else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0) ==
'_') {
toCatalystType(typeName.drop(1)).map(ArrayType(_))
} else None
}
}
JdbcDialects.registerDialect(PostgresDialect)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]