When you create a dataframe using the sqlContext.read.schema() API, if you pass 
in a schema that's compatible with some of the records, but incompatible with 
others, it seems you can't do a .select on the problematic columns, instead you 
get an AnalysisException error.

I know loading the wrong data isn't good behaviour, but if you're reading data 
from (for example) JSON files, there's going to be malformed files along the 
way. I think it would be nice to handle this error in a nicer way, though I 
don't know the best way to approach it.

Before I raise a JIRA ticket about it, would people consider this to be a bug 
or expected behaviour?

I've attached a couple of sample JSON files and the steps below to reproduce 
it, by taking the inferred schema from the simple1.json file, and applying it 
to a union of simple1.json and simple2.json. You can visually see the data has 
been parsed as I think you'd want if you do a .select on the parent column and 
print out the output, but when you do a select on the problem column you 
instead get an exception.

scala> val s1Rdd = sc.wholeTextFiles("/tmp/simple1.json").map(x => x._2)
s1Rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[171] at map at 
<console>:27

scala> val s1schema = sqlContext.read.json(s1Rdd).schema
s1schema: org.apache.spark.sql.types.StructType = 
StructType(StructField(data,ArrayType(StructType(StructField(stuff,ArrayType(StructType(StructField(onetype,ArrayType(StructType(StructField(id,LongType,true),
 StructField(name,StringType,true)),true),true), 
StructField(othertype,ArrayType(StructType(StructField(company,StringType,true),
 StructField(id,LongType,true)),true),true)),true),true)),true),true))

scala> 
sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff").take(2).foreach(println)
[WrappedArray(WrappedArray([WrappedArray([1,John Doe], [2,Don Joeh]),null], 
[null,WrappedArray([ACME,2])]))]
[WrappedArray(WrappedArray([null,WrappedArray([null,1], [null,2])], 
[WrappedArray([2,null]),null]))]

scala> sqlContext.read.schema(s1schema).json(s2Rdd).select("data.stuff.onetype")
org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
                at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
                at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
                at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
                at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
                at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)

(The full exception is attached too).

What do people think, is this a bug?

Thanks,
Ewan

Attachment: simple1.json
Description: simple1.json

Attachment: simple2.json
Description: simple2.json

org.apache.spark.sql.AnalysisException: cannot resolve 'data.stuff[onetype]' 
due to data type mismatch: argument 2 requires integral type, however, 
'onetype' is of string type.;
  at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:65)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractTraversable.map(Traversable.scala:105)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:125)
  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  at scala.collection.Iterator$class.foreach(Iterator.scala:727)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
  at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
  at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
  at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
  at scala.collection.AbstractIterator.to(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
  at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
  at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
  at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:125)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
  at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
  at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
  at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
  at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
  at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
  at org.apache.spark.sql.DataFrame.select(DataFrame.scala:768)
  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
  at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
  at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
  at $iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
  at $iwC$$iwC$$iwC.<init>(<console>:47)
  at $iwC$$iwC.<init>(<console>:49)
  at $iwC.<init>(<console>:51)
  at <init>(<console>:53)
  at .<init>(<console>:57)
  at .<clinit>(<console>)
  at .<init>(<console>:7)
  at .<clinit>(<console>)
  at $print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
  at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
  at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
  at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
  at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
  at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
  at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
  at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
  at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
  at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
  at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
  at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
  at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
  at org.apache.spark.repl.Main$.main(Main.scala:31)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to