Hi Team, can please some body help me out what am doing wrong to get the below exception while running my app on Yarn cluster with spark 1.4.
Kafka stream am getting AND DOING foreachRDD and giving it to new thread for process.please find the below code snippet. JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch( jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY()); unionStreams .foreachRDD(new Function2<JavaRDD<MessageAndMetadata>, Time, Void>() { @Override public Void call(JavaRDD<MessageAndMetadata> rdd, Time time) throws Exception { new ThreadParam(rdd).start(); return null; } }); ############################# public ThreadParam(JavaRDD<MessageAndMetadata> rdd) { this.rdd = rdd; // this.context=context; } public void run(){ final List<StructField> fields = new ArrayList<StructField>(); List<String> listvalues=new ArrayList<>(); final List<String> meta=new ArrayList<>(); JavaRDD<Row> rowrdd=rdd.map(new Function<MessageAndMetadata, Row>() { @Override public Row call(MessageAndMetadata arg0) throws Exception { String[] data=new String(arg0.getPayload()).split("\\|"); int i=0; List<StructField> fields = new ArrayList<StructField>(); List<String> listvalues=new ArrayList<>(); List<String> meta=new ArrayList<>(); for (String string : data) { if(i>3){ if(i%2==0){ fields.add(DataTypes.createStructField(string, DataTypes.StringType, true)); // System.out.println(splitarr[i]); }else{ listvalues.add(string); // System.out.println(splitarr[i]); } }else{ meta.add(string); } i++; }int size=listvalues.size(); return RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23), listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20), listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17), listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14), listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11), listvalues.get(25-10),listvalues.get(25-9),listvalues.get(25-8), listvalues.get(25-7),listvalues.get(25-6),listvalues.get(25-5), listvalues.get(25-4),listvalues.get(25-3),listvalues.get(25-2),listvalues.get(25-1)); } }); SQLContext sqlContext = new SQLContext(rowrdd.context()); StructType schema = DataTypes.createStructType(fields); System.out.println("before creating schema"); DataFrame courseDf=sqlContext.createDataFrame(rowrdd, schema); courseDf.registerTempTable("course"); courseDf.show(); System.out.println("after creating schema"); ################################################ BELOW IS THE COMMAND TO RUN THIS AND XENT FOR THAT IS THE STACKTRACE eRROR MASTER=yarn-client /home/hadoop/spark/bin/spark-submit --class com.person.Consumer /mnt1/manohar/spark-load-from-db/targetpark-load-from-db-1.0-SNAPSHOT-jar-with-dependencies.jar ERROR IS AS 15/07/27 14:45:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 (TID 72, ip-10-252-7-73.us-west-2.compute.internal): java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thanks In advance for the reply Team Thanks, Manohar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-0-on-Yarn-Client-tp24009.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org