You need to trigger an action on your rowrdd for it to execute the map, you
can do a rowrdd.count() for that.

Thanks
Best Regards

On Tue, Jul 28, 2015 at 2:18 PM, Manohar Reddy <
manohar.re...@happiestminds.com> wrote:

>  Hi Akhil,
>
>
>
> Thanks for thereply.I found the root cause but don’t know how to solve
> this.
>
> Below is the cause.this map function not going inside to execute because
> of this all my list fields are empty.
>
> Please let me know what  might be the cause to not execute this snippet of
> code*.the below map is not execution not going inside.*
>
> JavaRDD<Row> rowrdd=*rdd**.map(**new** Function<MessageAndMetadata,
> Row>() {*
>
>         *@Override*
>
>         *public** Row call(MessageAndMetadata **arg0**) **throws**
> Exception {*
>
> *              System.**out**.println(**"inside thread map
> callllllllllllllll"**);*
>
> *                String[] **data**=**new** String(**arg0*
> *.getPayload()).split(**"\\|"**);*
>
>                 *int* *i**=0;*
>
>                 *for** (String* string : data) {
>
>                         *if*(i>3){
>
>                                 *if*(i%2==0){
>
>                                           fields.add(DataTypes.
> *createStructField*(string, DataTypes.*StringType*,
>
> *true*));
>
>                                       System.*out*.println(string);
>
>                                 }*else*{
>
>                                         listvalues.add(string);
>
>                                         System.*out*.println(string);
>
>                                 }
>
>
>
> *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com]
> *Sent:* Tuesday, July 28, 2015 1:52 PM
> *To:* Manohar Reddy
> *Cc:* user@spark.apache.org
> *Subject:* Re: java.lang.ArrayIndexOutOfBoundsException: 0 on Yarn Client
>
>
>
> Put a try catch inside your code and inside the catch print out the length
> or the list itself which causes the ArrayIndexOutOfBounds. It might happen
> that some of your data is not proper.
>
>
>   Thanks
>
> Best Regards
>
>
>
> On Mon, Jul 27, 2015 at 8:24 PM, Manohar753 <
> manohar.re...@happiestminds.com> wrote:
>
> Hi Team,
>
> can please some body help me out what am doing wrong to get the below
> exception while running my app on Yarn cluster with spark 1.4.
>
> Kafka stream am getting AND DOING foreachRDD and giving it to new thread
> for
> process.please find the below code snippet.
>
> JavaDStream<MessageAndMetadata> unionStreams = ReceiverLauncher.launch(
>                                 jsc, props, numberOfReceivers,
> StorageLevel.MEMORY_ONLY());
>                 unionStreams
>                                 .foreachRDD(new
> Function2<JavaRDD&lt;MessageAndMetadata>, Time, Void>()
> {
>
>                                         @Override
>                                         public Void
> call(JavaRDD<MessageAndMetadata> rdd, Time time)
>                                                         throws Exception {
>                                                 new
> ThreadParam(rdd).start();
>
>
>                                                 return null;
>                                         }
>                                 });
> #############################
> public ThreadParam(JavaRDD<MessageAndMetadata> rdd) {
>                 this.rdd = rdd;
> //              this.context=context;
>         }
>
>         public void run(){
>                 final List<StructField> fields = new
> ArrayList<StructField>();
>                 List<String> listvalues=new ArrayList<>();
>                 final List<String> meta=new ArrayList<>();
>
>         JavaRDD<Row> rowrdd=rdd.map(new Function<MessageAndMetadata,
> Row>() {
>                 @Override
>                 public Row call(MessageAndMetadata arg0) throws Exception {
>                         String[] data=new
> String(arg0.getPayload()).split("\\|");
>                         int i=0;
>                         List<StructField> fields = new
> ArrayList<StructField>();
>                         List<String> listvalues=new ArrayList<>();
>                         List<String> meta=new ArrayList<>();
>                         for (String string : data) {
>                                 if(i>3){
>                                         if(i%2==0){
>
> fields.add(DataTypes.createStructField(string, DataTypes.StringType,
> true));
> //
> System.out.println(splitarr[i]);
>                                         }else{
>                                                 listvalues.add(string);
> //
> System.out.println(splitarr[i]);
>                                         }
>                                 }else{
>                                         meta.add(string);
>                                 }
>                                 i++;
>                         }int size=listvalues.size();
>                         return
>
> RowFactory.create(listvalues.get(25-25),listvalues.get(25-24),listvalues.get(25-23),
>
> listvalues.get(25-22),listvalues.get(25-21),listvalues.get(25-20),
>
> listvalues.get(25-19),listvalues.get(25-18),listvalues.get(25-17),
>
> listvalues.get(25-16),listvalues.get(25-15),listvalues.get(25-14),
>
> listvalues.get(25-13),listvalues.get(25-12),listvalues.get(25-11),
>
> listvalues.get(25-10),listvalues.get(25-9),listvalues.get(25-8),
>
> listvalues.get(25-7),listvalues.get(25-6),listvalues.get(25-5),
>
>
> listvalues.get(25-4),listvalues.get(25-3),listvalues.get(25-2),listvalues.get(25-1));
>
>                 }
>         });
>
>         SQLContext sqlContext = new SQLContext(rowrdd.context());
>         StructType schema = DataTypes.createStructType(fields);
> System.out.println("before creating schema");
> DataFrame courseDf=sqlContext.createDataFrame(rowrdd, schema);
> courseDf.registerTempTable("course");
> courseDf.show();
>         System.out.println("after creating schema");
>
> ################################################
> BELOW IS THE  COMMAND TO RUN THIS AND XENT FOR THAT IS THE STACKTRACE eRROR
>  MASTER=yarn-client /home/hadoop/spark/bin/spark-submit --class
> com.person.Consumer
>
> /mnt1/manohar/spark-load-from-db/targetpark-load-from-db-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
> ERROR IS AS
>
>
> 15/07/27 14:45:01 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 4.0
> (TID 72, ip-10-252-7-73.us-west-2.compute.internal):
> java.lang.ArrayIndexOutOfBoundsException: 0
>         at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
>         at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
>         at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>         at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>         at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:745)
>
> Thanks In advance for the reply Team
>
> Thanks,
> Manohar
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-0-on-Yarn-Client-tp24009.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>   ------------------------------
> Happiest Minds Disclaimer
>
> This message is for the sole use of the intended recipient(s) and may
> contain confidential, proprietary or legally privileged information. Any
> unauthorized review, use, disclosure or distribution is prohibited. If you
> are not the original intended recipient of the message, please contact the
> sender by reply email and destroy all copies of the original message.
> Happiest Minds Technologies <http://www.happiestminds.com>
>
> ------------------------------
>

Reply via email to