Thanks for the clarifications Mrithul. Thanks Best Regards
On Fri, Aug 14, 2015 at 1:04 PM, Mridul Muralidharan <mri...@gmail.com> wrote: > What I understood from Imran's mail (and what was referenced in his > mail) the RDD mentioned seems to be violating some basic contracts on > how partitions are used in spark [1]. > They cannot be arbitrarily numbered,have duplicates, etc. > > > Extending RDD to add functionality is typically for niche cases; and > requires subclasses to adhere to the explicit (and implicit) > contracts/lifecycles for them. > Using existing RDD's as template would be a good idea for > customizations - one way to look at it is, using RDD is more in api > space but extending them is more in spi space. > > Violations would actually not even be detectable by spark-core in general > case. > > > Regards, > Mridul > > [1] Ignoring the array out of bounds, etc - I am assuming the intent > is to show overlapping partitions, duplicates. index to partition > mismatch - that sort of thing. > > > On Thu, Aug 13, 2015 at 11:42 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > > Yep, and it works fine for operations which does not involve any shuffle > > (like foreach,, count etc) and those which involves shuffle operations > ends > > up in an infinite loop. Spark should somehow indicate this instead of > going > > in an infinite loop. > > > > Thanks > > Best Regards > > > > On Thu, Aug 13, 2015 at 11:37 PM, Imran Rashid <iras...@cloudera.com> > wrote: > >> > >> oh I see, you are defining your own RDD & Partition types, and you had a > >> bug where partition.index did not line up with the partitions slot in > >> rdd.getPartitions. Is that correct? > >> > >> On Thu, Aug 13, 2015 at 2:40 AM, Akhil Das <ak...@sigmoidanalytics.com> > >> wrote: > >>> > >>> I figured that out, And these are my findings: > >>> > >>> -> It just enters in an infinite loop when there's a duplicate > partition > >>> id. > >>> > >>> -> It enters in an infinite loop when the partition id starts from 1 > >>> rather than 0 > >>> > >>> > >>> Something like this piece of code can reproduce it: (in > getPartitions()) > >>> > >>> val total_partitions = 4 > >>> val partitionsArray: Array[Partition] = > >>> Array.ofDim[Partition](total_partitions) > >>> > >>> var i = 0 > >>> > >>> for(outer <- 0 to 1){ > >>> for(partition <- 1 to total_partitions){ > >>> partitionsArray(i) = new DeadLockPartitions(partition) > >>> i = i + 1 > >>> } > >>> } > >>> > >>> partitionsArray > >>> > >>> > >>> > >>> > >>> Thanks > >>> Best Regards > >>> > >>> On Wed, Aug 12, 2015 at 10:57 PM, Imran Rashid <iras...@cloudera.com> > >>> wrote: > >>>> > >>>> yikes. > >>>> > >>>> Was this a one-time thing? Or does it happen consistently? can you > >>>> turn on debug logging for o.a.s.scheduler (dunno if it will help, but > maybe > >>>> ...) > >>>> > >>>> On Tue, Aug 11, 2015 at 8:59 AM, Akhil Das < > ak...@sigmoidanalytics.com> > >>>> wrote: > >>>>> > >>>>> Hi > >>>>> > >>>>> My Spark job (running in local[*] with spark 1.4.1) reads data from a > >>>>> thrift server(Created an RDD, it will compute the partitions in > >>>>> getPartitions() call and in computes hasNext will return records > from these > >>>>> partitions), count(), foreach() is working fine it returns the > correct > >>>>> number of records. But whenever there is shuffleMap stage (like > reduceByKey > >>>>> etc.) then all the tasks are executing properly but it enters in an > infinite > >>>>> loop saying : > >>>>> > >>>>> 15/08/11 13:05:54 INFO DAGScheduler: Resubmitting ShuffleMapStage 1 > >>>>> (map at FilterMain.scala:59) because some of its tasks had failed: > 0, 3 > >>>>> > >>>>> > >>>>> Here's the complete stack-trace http://pastebin.com/hyK7cG8S > >>>>> > >>>>> What could be the root cause of this problem? I looked up and bumped > >>>>> into this closed JIRA (which is very very old) > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> Thanks > >>>>> Best Regards > >>>> > >>>> > >>> > >> > > >