Put a try catch inside your code and inside the catch print out the length
or the list itself which causes the ArrayIndexOutOfBounds. It might happen
that some of your data is not proper.

Thanks
Best Regards

On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 <manohar.re...@happiestminds.com
> wrote:

> Hi Team,
>
> can please some body help me out what am doing wrong to get the below
> exception while running my app on Yarn cluster with spark 1.4.
>
> Kafka stream am getting AND DOING foreachRDD and giving it to new thread
> for
> process.please find the below code snippet.
>
> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>                                 jsc, props, numberOfReceivers,
> StorageLevel.MEMORY_ONLY());
>                 unionStreams
>                                 .foreachRDD(new
> Function2<JavaRDD&lt;MessageAndMetadata>, Time, Void>()
> {
>
>                                         @Override
>                                         public Void
> call(JavaRDD<MessageAndMetadata> rdd, Time time)
>                                                         throws Exception {
>                                                 new
> ThreadParam(rdd).start();
>
>
>                                                 return null;
>                                         }
>                                 });
> #############################
> public ThreadParam(JavaRDD<MessageAndMetadata> rdd) {
>                 this.rdd = rdd;
> //              this.context=context;
>         }
>
>         public void run(){
>                 final List<StructField> fields = new
> ArrayList<StructField>();
>                 List<String> listvalues=new ArrayList<>();
>                 final List<String> meta=new ArrayList<>();
>
>         JavaRDD<Row> rowrdd=rdd.map(new Function<MessageAndMetadata,
> Row>() {
>                 @Override
>                 public Row call(MessageAndMetadata arg0) throws Exception {
>                         String[] data=new
> String(arg0.getPayload()).split("\\|");
>                         int i=0;
>                         List<StructField> fields = new
> ArrayList<StructField>();
>                         List<String> listvalues=new ArrayList<>();
>                         List<String> meta=new ArrayList<>();
>                         for (String string : data) {
>                                 if(i>3){
>                                         if(i%2==0){
>
> fields.add(DataTypes.createStructField(string, DataTypes.StringType,
> true));
> //
> System.out.println(splitarr[i]);
>                                         }else{
>                                                 listvalues.add(string);
> //
> System.out.println(splitarr[i]);
>                                         }
>                                 }else{
>                                         meta.add(string);
>                                 }
>                                 i++;
>                         }int size=listvalues.size();
>                         return
>
> RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23),
>
> listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20),
>
> listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17),
>
> listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14),
>
> listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11),
>
> listvalues.get(25-10),listvalues.get(25-9),listvalues.get(25-8),
>
> listvalues.get(25-7),listvalues.get(25-6),listvalues.get(25-5),
>
>
> listvalues.get(25-4),listvalues.get(25-3),listvalues.get(25-2),listvalues.get(25-1));
>
>                 }
>         });
>
>         SQLContext sqlContext = new SQLContext(rowrdd.context());
>         StructType schema = DataTypes.createStructType(fields);
> System.out.println("before creating schema");
> DataFrame courseDf=sqlContext.createDataFrame(rowrdd, schema);
> courseDf.registerTempTable("course");
> courseDf.show();
>         System.out.println("after creating schema");
>
> ################################################
> BELOW IS THE  COMMAND TO RUN THIS AND XENT FOR THAT IS THE STACKTRACE eRROR
>  MASTER=yarn-client /home/hadoop/spark/bin/spark-submit --class
> com.person.Consumer
>
> /mnt1/manohar/spark-load-from-db/targetpark-load-from-db-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
> ERROR IS AS
>
>
> 15/07/27 14:45:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0
> (TID 72, ip-10-252-7-73.us-west-2.compute.internal):
> java.lang.ArrayIndexOutOfBoundsException: 0
>         at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
>         at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
>         at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>         at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>         at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Thanks In advance for the reply Team
>
> Thanks,
> Manohar
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-0-on-Yarn-Client-tp24009.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to