Can you show more of your code inside the while loop ?

Which version of Spark / Kinesis do you use ?

Thanks

On Mon, Apr 25, 2016 at 4:04 AM, Selvam Raman <sel...@gmail.com> wrote:

> I am reading a data from Kinesis stream (merging shard values with union
> stream) to spark streaming. then doing the following code to push the data
> to DB.
> ​
>
> splitCSV.foreachRDD(new VoidFunction2<JavaRDD<String[]>,Time>()
> {
>
> private static final long serialVersionUID = 1L;
>
> public void call(JavaRDD<String[]> rdd, Time time) throws Exception
> {
> JavaRDD<SFieldBean> varMapRDD = rdd.map(new Function<String[],SFieldBean>()
> {
> private static final long serialVersionUID = 1L;
>
> public SFieldBean call(String[] values) throws Exception
> {
> .....
> );
>
> varMapRDD.foreachPartition(new VoidFunction<Iterator<SFieldBean>>(
> {
> private static final long serialVersionUID = 1L;
> MySQLConnectionHelper.getConnection("urlinfo");
> @Override
> public void call(Iterator<SFieldBean> iterValues) throws Exception
> {
> ....
> while(iterValues.hasNext())
> {
>
> }
> }
>
> Though I am using hasNext but it throws the follwing error
> ​
> Caused by: java.util.NoSuchElementException: next on empty iterator
>         at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
>         at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
>         at
> scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
>         at
> org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
>         at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319)
>         at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288)
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
>         at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
>         at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
>         at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>         ... 3 more
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>

Reply via email to