Can you show more of your code inside the while loop ? Which version of Spark / Kinesis do you use ?
Thanks On Mon, Apr 25, 2016 at 4:04 AM, Selvam Raman <sel...@gmail.com> wrote: > I am reading a data from Kinesis stream (merging shard values with union > stream) to spark streaming. then doing the following code to push the data > to DB. > > > splitCSV.foreachRDD(new VoidFunction2<JavaRDD<String[]>,Time>() > { > > private static final long serialVersionUID = 1L; > > public void call(JavaRDD<String[]> rdd, Time time) throws Exception > { > JavaRDD<SFieldBean> varMapRDD = rdd.map(new Function<String[],SFieldBean>() > { > private static final long serialVersionUID = 1L; > > public SFieldBean call(String[] values) throws Exception > { > ..... > ); > > varMapRDD.foreachPartition(new VoidFunction<Iterator<SFieldBean>>( > { > private static final long serialVersionUID = 1L; > MySQLConnectionHelper.getConnection("urlinfo"); > @Override > public void call(Iterator<SFieldBean> iterValues) throws Exception > { > .... > while(iterValues.hasNext()) > { > > } > } > > Though I am using hasNext but it throws the follwing error > > Caused by: java.util.NoSuchElementException: next on empty iterator > at scala.collection.Iterator$$anon$2.next(Iterator.scala:39) > at scala.collection.Iterator$$anon$2.next(Iterator.scala:37) > at > scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64) > at > org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30) > at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319) > at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > ... 3 more > > > -- > Selvam Raman > "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து" >