Spark SQL: org.apache.spark.sql.AnalysisException: cannot resolve "some columns" given input columns.

2016-06-07 Thread Jerry Wong
Hi,

Two JSON files but one of them miss some columns, like

{"firstName": "Jack", "lastName": "Nelson"}
{"firstName": "Landy", "middleName": "Ken", "lastName": "Yong"}

slqContext.sql("select firstName as first_name, middleName as middle_name,
lastName as last_name from jsonTable)

But there are an error
org.apache.spark.sql.AnalysisException: cannot resolve 'middleName' given
input columns firstName, lastName;

Can anybody give me your wisdom or any suggestions?

Thanks!
Jerry


Spark SQL Nested Array of JSON with empty field

2016-06-03 Thread Jerry Wong
Hi,

I met a problem of empty field in the nested JSON file with Spark SQL. For
instance,
There are two lines of JSON file as follows,

{
"firstname": "Jack",
"lastname": "Nelson",
"address": {
"state": "New York",
"city": "New York"
}
}{
"firstname": "Landy",
"middlename": "Ken",
"lastname": "Yong",
"address": {
"state": "California",
"city": "Los Angles"
}
}

I use Spark SQL to get the files like,
val row = sqlContext.sql("SELECT firstname, middlename, lastname,
address.state, address.city FROM jsontable")
The compile will tell me the error of line1: no "middlename".
How do I handle this case in the SQL sql?

Many thanks in advance!
Jerry


Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required

2015-10-29 Thread Jerry Wong
I used the spark 1.3.1 to populate the event logs to Cassandra. But there
is an exception that I could not find out any clauses. Can anybody give me
any helps?

Exception in thread "main" java.lang.IllegalArgumentException: Positive
number of slices required
 at
org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119)
 at
org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367)
 at org.apache.spark.rdd.RDD.collect(RDD.scala:797)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4$$anonfun$apply$3.apply$mcV$sp(EventLogClusterIngestor.scala:155)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:145)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:144)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(EventLogClusterIngestor.scala:144)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:139)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:132)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(EventLogClusterIngestor.scala:132)
 at scala.util.control.Breaks.breakable(Breaks.scala:37)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:125)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:115)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:115)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:107)
 at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.processEventLogMapStreamDiff2(EventLogClusterIngestor.scala:107)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.main(EventLogClusterIngestor.scala:573)
 at
com.siriusxm.sequencer.ingestor.EventLogClusterIngestor.main(EventLogClusterIngestor.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This does happen in a RDD foreach. I pasted the broken lines as follows:

132 difFileRDD.filter(a=>a.size>0).collect().foreach(file => {
. //...
135val lines = sc.textFile("file:///" + file)
136val elogs = lines.flatMap(_.split("\n"))
137val numOfel = elogs.count()
138 //...
139breakable {
140if(numOfel <= 0) {
141 //..
142  break
143}else{
144 elogs.filter(a=>a.size>0).foreac