Hi Brandon, It's relatively straightforward to try out different type options for this in the spark-shell, try pasting the attached code into spark-shell before you make a normal postgres JDBC connection.
You can then experiment with the mappings without recompiling Spark or anything like that, and you can embed the same code in your own packages, outside of the main Spark releases. Thanks, Ewan -----Original Message----- From: BrandonBradley [mailto:bradleytas...@gmail.com] Sent: 22 January 2016 14:29 To: dev@spark.apache.org Subject: Re: Spark 1.6.1 I'd like more complete Postgres JDBC support for ArrayType before the next release. Some of them are still broken in 1.6.0. It would save me much time. Please see SPARK-12747 @ https://issues.apache.org/jira/browse/SPARK-12747 Cheers! Brandon Bradley -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-1-tp16009p16082.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
import java.sql.{Connection, Types} import org.apache.spark.sql.types._ import org.apache.spark.sql.jdbc import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.jdbc.JdbcDialect import org.apache.spark.sql.jdbc.JdbcType def toCatalystType(typeName: String): Option[DataType] = typeName match { case "bool" => Some(BooleanType) case "bit" => Some(BinaryType) case "int2" => Some(ShortType) case "int4" => Some(IntegerType) case "int8" | "oid" => Some(LongType) case "float4" => Some(FloatType) case "money" | "float8" => Some(DoubleType) case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" => Some(StringType) case "bytea" => Some(BinaryType) case "timestamp" | "timestamptz" | "time" | "timetz" => Some(TimestampType) case "date" => Some(DateType) case "numeric" => Some(DecimalType.SYSTEM_DEFAULT) case _ => None } case object PostgresDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql") override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) { Some(BinaryType) } else if (sqlType == Types.OTHER) { toCatalystType(typeName).filter(_ == StringType) } else if (sqlType == Types.ARRAY && typeName.length > 1 && typeName(0) == '_') { toCatalystType(typeName.drop(1)).map(ArrayType(_)) } else None } } JdbcDialects.registerDialect(PostgresDialect)
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org