I used the spark 1.3.1 to populate the event logs to Cassandra. But there is an exception that I could not find out any clauses. Can anybody give me any helps?
Exception in thread "main" java.lang.IllegalArgumentException: Positive number of slices required at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:119) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1367) at org.apache.spark.rdd.RDD.collect(RDD.scala:797) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4$$anonfun$apply$3.apply$mcV$sp(EventLogClusterIngestor.scala:155) at scala.util.control.Breaks.breakable(Breaks.scala:37) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:145) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2$$anonfun$apply$mcV$sp$4.apply(EventLogClusterIngestor.scala:144) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2$$anonfun$apply$2.apply$mcV$sp(EventLogClusterIngestor.scala:144) at scala.util.control.Breaks.breakable(Breaks.scala:37) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:139) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1$$anonfun$apply$mcV$sp$2.apply(EventLogClusterIngestor.scala:132) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(EventLogClusterIngestor.scala:132) at scala.util.control.Breaks.breakable(Breaks.scala:37) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:125) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1$$anonfun$apply$7.apply(EventLogClusterIngestor.scala:115) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:115) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$$anonfun$processEventLogMapStreamDiff2$1.apply(EventLogClusterIngestor.scala:107) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.processEventLogMapStreamDiff2(EventLogClusterIngestor.scala:107) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor$.main(EventLogClusterIngestor.scala:573) at com.siriusxm.sequencer.ingestor.EventLogClusterIngestor.main(EventLogClusterIngestor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) This does happen in a RDD foreach. I pasted the broken lines as follows: 132 difFileRDD.filter(a=>a.size>0).collect().foreach(file => { . //... 135 val lines = sc.textFile("file:///" + file) 136 val elogs = lines.flatMap(_.split("\n")) 137 val numOfel = elogs.count() 138 //... 139 breakable { 140 if(numOfel <= 0) { 141 //...... 142 break 143 }else{ 144 elogs.filter(a=>a.size>0).foreach(log => { 145 breakable { 146 if (log == null || log.length == 0) { 147 break 148 }else { . //Here is the logical processes . } . } . }) . } . } .}) Any suggestions? Thanks! ---- Wong