You need to trigger an action on your rowrdd for it to execute the map, you can do a rowrdd.count() for that.
Thanks Best Regards On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy < manohar.re...@happiestminds.com> wrote: > Hi Akhil, > > > > Thanks for thereply.I found the root cause but don’t know how to solve > this. > > Below is the cause.this map function not going inside to execute because > of this all my list fields are empty. > > Please let me know what might be the cause to not execute this snippet of > code*.the below map is not execution not going inside.* > > JavaRDD<Row> rowrdd=*rdd**.map(**new** Function<MessageAndMetadata, > Row>() {* > > *@Override* > > *public** Row call(MessageAndMetadata **arg0**) **throws** > Exception {* > > * System.**out**.println(**"inside thread map > callllllllllllllll"**);* > > * String[] **data**=**new** String(**arg0* > *.getPayload()).split(**"\\|"**);* > > *int* *i**=0;* > > *for** (String* string : data) { > > *if*(i>3){ > > *if*(i%2==0){ > > fields.add(DataTypes. > *createStructField*(string, DataTypes.*StringType*, > > *true*)); > > System.*out*.println(string); > > }*else*{ > > listvalues.add(string); > > System.*out*.println(string); > > } > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Tuesday, July 28, 2015 1:52 PM > *To:* Manohar Reddy > *Cc:* user@spark.apache.org > *Subject:* Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client > > > > Put a try catch inside your code and inside the catch print out the length > or the list itself which causes the ArrayIndexOutOfBounds. It might happen > that some of your data is not proper. > > > Thanks > > Best Regards > > > > On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 < > manohar.re...@happiestminds.com> wrote: > > Hi Team, > > can please some body help me out what am doing wrong to get the below > exception while running my app on Yarn cluster with spark 1.4. > > Kafka stream am getting AND DOING foreachRDD and giving it to new thread > for > process.please find the below code snippet. > > JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch( > jsc, props, numberOfReceivers, > StorageLevel.MEMORY_ONLY()); > unionStreams > .foreachRDD(new > Function2<JavaRDD<MessageAndMetadata>, Time, Void>() > { > > @Override > public Void > call(JavaRDD<MessageAndMetadata> rdd, Time time) > throws Exception { > new > ThreadParam(rdd).start(); > > > return null; > } > }); > ############################# > public ThreadParam(JavaRDD<MessageAndMetadata> rdd) { > this.rdd = rdd; > // this.context=context; > } > > public void run(){ > final List<StructField> fields = new > ArrayList<StructField>(); > List<String> listvalues=new ArrayList<>(); > final List<String> meta=new ArrayList<>(); > > JavaRDD<Row> rowrdd=rdd.map(new Function<MessageAndMetadata, > Row>() { > @Override > public Row call(MessageAndMetadata arg0) throws Exception { > String[] data=new > String(arg0.getPayload()).split("\\|"); > int i=0; > List<StructField> fields = new > ArrayList<StructField>(); > List<String> listvalues=new ArrayList<>(); > List<String> meta=new ArrayList<>(); > for (String string : data) { > if(i>3){ > if(i%2==0){ > > fields.add(DataTypes.createStructField(string, DataTypes.StringType, > true)); > // > System.out.println(splitarr[i]); > }else{ > listvalues.add(string); > // > System.out.println(splitarr[i]); > } > }else{ > meta.add(string); > } > i++; > }int size=listvalues.size(); > return > > RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23), > > listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20), > > listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17), > > listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14), > > listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11), > > listvalues.get(25-10),listvalues.get(25-9),listvalues.get(25-8), > > listvalues.get(25-7),listvalues.get(25-6),listvalues.get(25-5), > > > listvalues.get(25-4),listvalues.get(25-3),listvalues.get(25-2),listvalues.get(25-1)); > > } > }); > > SQLContext sqlContext = new SQLContext(rowrdd.context()); > StructType schema = DataTypes.createStructType(fields); > System.out.println("before creating schema"); > DataFrame courseDf=sqlContext.createDataFrame(rowrdd, schema); > courseDf.registerTempTable("course"); > courseDf.show(); > System.out.println("after creating schema"); > > ################################################ > BELOW IS THE COMMAND TO RUN THIS AND XENT FOR THAT IS THE STACKTRACE eRROR > MASTER=yarn-client /home/hadoop/spark/bin/spark-submit --class > com.person.Consumer > > /mnt1/manohar/spark-load-from-db/targetpark-load-from-db-1.0-SNAPSHOT-jar-with-dependencies.jar > > > ERROR IS AS > > > 15/07/27 14:45:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0 > (TID 72, ip-10-252-7-73.us-west-2.compute.internal): > java.lang.ArrayIndexOutOfBoundsException: 0 > at > > org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) > at > > org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) > at > org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) > at > org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) > at > > org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) > at > > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) > at org.apache.spark.scheduler.Task.run(Task.scala:70) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Thanks In advance for the reply Team > > Thanks, > Manohar > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-0-on-Yarn-Client-tp24009.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > > ------------------------------ > Happiest Minds Disclaimer > > This message is for the sole use of the intended recipient(s) and may > contain confidential, proprietary or legally privileged information. Any > unauthorized review, use, disclosure or distribution is prohibited. If you > are not the original intended recipient of the message, please contact the > sender by reply email and destroy all copies of the original message. > Happiest Minds Technologies <http://www.happiestminds.com> > > ------------------------------ >