I am reading a data from Kinesis stream (merging shard values with union
stream) to spark streaming. then doing the following code to push the data
to DB.
​

splitCSV.foreachRDD(new VoidFunction2<JavaRDD<String[]>,Time>()
{

private static final long serialVersionUID = 1L;

public void call(JavaRDD<String[]> rdd, Time time) throws Exception
{
JavaRDD<SFieldBean> varMapRDD = rdd.map(new Function<String[],SFieldBean>()
{
private static final long serialVersionUID = 1L;

public SFieldBean call(String[] values) throws Exception
{
.....
);

varMapRDD.foreachPartition(new VoidFunction<Iterator<SFieldBean>>(
{
private static final long serialVersionUID = 1L;
MySQLConnectionHelper.getConnection("urlinfo");
@Override
public void call(Iterator<SFieldBean> iterValues) throws Exception
{
....
while(iterValues.hasNext())
{

}
}

Though I am using hasNext but it throws the follwing error
​
Caused by: java.util.NoSuchElementException: next on empty iterator
        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
        at
scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:64)
        at
org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
        at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:319)
        at com.test.selva.KinesisToSpark$2$2.call(KinesisToSpark.java:288)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        ... 3 more


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

Reply via email to