[ https://issues.apache.org/jira/browse/SPARK-20846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16024252#comment-16024252 ]
Takeshi Yamamuro commented on SPARK-20846: ------------------------------------------ Basically, I think we do not allow nested arrays in JDBC datasource: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L473 Also, It seems it is difficult to handle this case because int[] and int[][] has the same metadata in ResultSetMetadata of postgresql JDBC drivers. Actually, in postgresql shell, they has the same types there (I'm not sure why though...) {code} postgres=# create table tx(a int[], b int[][]); CREATE TABLE postgres=# \d tx Table "public.tx" Column | Type | Modifiers --------+-----------+----------- a | integer[] | b | integer[] | {code} > Incorrect posgres sql array column schema inferred from table. > -------------------------------------------------------------- > > Key: SPARK-20846 > URL: https://issues.apache.org/jira/browse/SPARK-20846 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Reporter: Stuart Reynolds > > When reading a table containing int[][] columns from postgres, the column is > inferred as int[] (should be int[][]). > {code:python} > from pyspark.sql import SQLContext > import pandas as pd > from dataIngest.util.sqlUtil import asSQLAlchemyEngine > user,password = ..., ... > url = "postgresql://hostname:5432/dbname" > url = 'jdbc:'+url > properties = {'user': user, 'password': password} > engine = ... sql alchemy engine ... > Create pandas df with int[] and int[][] > df = pd.DataFrame({ > 'a1': [[1,2,None],[1,2,3], None], > 'b2': [[[1],[None],[3]], [[1],[2],[3]], None] > }) > # Store df into postgres as table _dfjunk > with engine.connect().execution_options(autocommit=True) as con: > con.execute(""" > DROP TABLE IF EXISTS _dfjunk; > > CREATE TABLE _dfjunk ( > a1 int[] NULL, > b2 int[][] NULL > ); > """) > df.to_sql("_dfjunk", con, index=None, if_exists="append") > # Let's access via spark > sc = get_spark_context(master="local") > sqlContext = SQLContext(sc) > print "pandas DF as spark DF:" > df = sqlContext.createDataFrame(df) > df.printSchema() > df.show() > df.registerTempTable("df") > print sqlContext.sql("select * from df").collect() > ### Export _dfjunk as table df3 > df3 = sqlContext.read.format("jdbc"). \ > option("url", url). \ > option("driver", "org.postgresql.Driver"). \ > option("useUnicode", "true"). \ > option("continueBatchOnError","true"). \ > option("useSSL", "false"). \ > option("user", user). \ > option("password", password). \ > option("dbtable", "_dfjunk").\ > load() > df3.registerTempTable("df3") > print "DF inferred from postgres:" > df3.printSchema() > df3.show() > print "DF queried from postgres:" > df3 = sqlContext.sql("select * from df3") > df3.printSchema() > df3.show() > print df3.collect() > {code} > Errors out with: > pandas DF as spark DF: > {noformat} > root > |-- a1: array (nullable = true) > | |-- element: long (containsNull = true) > |-- b2: array (nullable = true) > | |-- element: array (containsNull = true) > | | |-- element: long (containsNull = true) <<< ****** THIS IS > CORRECT !!!! > +------------+--------------------+ > | a1| b2| > +------------+--------------------+ > |[1, 2, null]|[WrappedArray(1),...| > | [1, 2, 3]|[WrappedArray(1),...| > | null| null| > +------------+--------------------+ > [Row(a1=[1, 2, None], b2=[[1], [None], [3]]), Row(a1=[1, 2, 3], b2=[[1], [2], > [3]]), Row(a1=None, b2=None)] > DF inferred from postgres: > root > |-- a1: array (nullable = true) > | |-- element: integer (containsNull = true) > |-- b2: array (nullable = true) > | |-- element: integer (containsNull = true) <<< ****** THIS IS > WRONG!!!! Is an array of arrays. > 17/05/22 15:00:39 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) > java.lang.ClassCastException: [Ljava.lang.Integer; cannot be cast to > java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > at > org.apache.spark.sql.catalyst.util.GenericArrayData.getInt(GenericArrayData.scala:62) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org