Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Cool!! Thanks for the clarification Mike. Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Michael Armbrust <mich...@databricks.com> To: Praveen Devarao/India/IBM@IBMIN Cc: Reynold Xin <r...@databricks.com>, "d...@spark.apache.org" <d...@spark.apache.org>, user <user@spark.apache.org> Date: 25/04/2016 10:59 pm Subject: Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Spark SQL's query planner has always delayed building the RDD, so has never needed to eagerly calculate the range boundaries (since Spark 1.0). On Mon, Apr 25, 2016 at 2:04 AM, Praveen Devarao <praveen...@in.ibm.com> wrote: Thanks Reynold for the reason as to why sortBykey invokes a Job When you say "DataFrame/Dataset does not have this issue" is it right to assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in it? Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From:Reynold Xin <r...@databricks.com> To:Praveen Devarao/India/IBM@IBMIN Cc:"d...@spark.apache.org" <d...@spark.apache.org>, user < user@spark.apache.org> Date:25/04/2016 11:26 am Subject:Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Usually no - but sortByKey does because it needs the range boundary to be built in order to have the RDD. It is a long standing problem that's unfortunately very difficult to solve without breaking the RDD API. In DataFrame/Dataset we don't have this issue though. On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao <praveen...@in.ibm.com> wrote: Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4atopCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b(sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Spark SQL's query planner has always delayed building the RDD, so has never needed to eagerly calculate the range boundaries (since Spark 1.0). On Mon, Apr 25, 2016 at 2:04 AM, Praveen Devarao <praveen...@in.ibm.com> wrote: > Thanks Reynold for the reason as to why sortBykey invokes a Job > > When you say "DataFrame/Dataset does not have this issue" is it right to > assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in > it? > > Thanking You > > - > Praveen Devarao > Spark Technology Centre > IBM India Software Labs > > - > "Courage doesn't always roar. Sometimes courage is the quiet voice at the > end of the day saying I will try again" > > > > From:Reynold Xin <r...@databricks.com> > To:Praveen Devarao/India/IBM@IBMIN > Cc:"d...@spark.apache.org" <d...@spark.apache.org>, user < > user@spark.apache.org> > Date:25/04/2016 11:26 am > Subject:Re: Do transformation functions on RDD invoke a Job > [sc.runJob]? > -- > > > > Usually no - but sortByKey does because it needs the range boundary to be > built in order to have the RDD. It is a long standing problem that's > unfortunately very difficult to solve without breaking the RDD API. > > In DataFrame/Dataset we don't have this issue though. > > > On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao <*praveen...@in.ibm.com* > <praveen...@in.ibm.com>> wrote: > Hi, > > I have a streaming program with the block as below [ref: > *https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala* > <https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala> > ] > > *1 val **lines *= *messages*.map(_._2) > *2 val **hashTags *= *lines*.flatMap(status => status.split(*" "* > ).filter(_.startsWith(*"#"*))) > > *3 val **topCounts60 *= *hashTags*.map((_, 1)).reduceByKey( _ + _ ) > *3a* .map { *case *(topic, count) => (count, topic) } > *3b* .transform(_.sortByKey(*false*)) > > *4a**topCounts60*.foreachRDD( rdd => { > *4b* *val *topList = rdd.take( 10 ) > }) > > This batch is triggering 2 jobs...one at line *3b**(sortByKey)* and > the other at *4b (rdd.take) *I agree that there is a Job triggered on > line 4b as take() is an action on RDD while as on line 3b sortByKey is just > a transformation function which as per docs is lazy evaluation...but I see > that this line uses a RangePartitioner and Rangepartitioner on > initialization invokes a method called *sketch() *that invokes *collect()* > triggering a Job. > > My question: Is it expected that sortByKey will invoke a Job...if > yes, why is sortByKey listed as a transformation and not action. Are there > any other functions like this that invoke a Job, though they are > transformations and not actions? > > I am on Spark 1.6 > > Thanking You > > - > Praveen Devarao > Spark Technology Centre > IBM India Software Labs > > - > "Courage doesn't always roar. Sometimes courage is the quiet voice at the > end of the day saying I will try again" > > > >
Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Thanks Reynold for the reason as to why sortBykey invokes a Job When you say "DataFrame/Dataset does not have this issue" is it right to assume you are referring to Spark 2.0 or Spark 1.6 DF already has built-in it? Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again" From: Reynold Xin <r...@databricks.com> To: Praveen Devarao/India/IBM@IBMIN Cc: "d...@spark.apache.org" <d...@spark.apache.org>, user <user@spark.apache.org> Date: 25/04/2016 11:26 am Subject: Re: Do transformation functions on RDD invoke a Job [sc.runJob]? Usually no - but sortByKey does because it needs the range boundary to be built in order to have the RDD. It is a long standing problem that's unfortunately very difficult to solve without breaking the RDD API. In DataFrame/Dataset we don't have this issue though. On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devarao <praveen...@in.ibm.com> wrote: Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4atopCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b(sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Do transformation functions on RDD invoke a Job [sc.runJob]?
Hi, I have a streaming program with the block as below [ref: https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala ] 1 val lines = messages.map(_._2) 2 val hashTags = lines.flatMap(status => status.split(" " ).filter(_.startsWith("#"))) 3 val topCounts60 = hashTags.map((_, 1)).reduceByKey( _ + _ ) 3a .map { case (topic, count) => (count, topic) } 3b .transform(_.sortByKey(false)) 4a topCounts60.foreachRDD( rdd => { 4b val topList = rdd.take( 10 ) }) This batch is triggering 2 jobs...one at line 3b (sortByKey) and the other at 4b (rdd.take) I agree that there is a Job triggered on line 4b as take() is an action on RDD while as on line 3b sortByKey is just a transformation function which as per docs is lazy evaluation...but I see that this line uses a RangePartitioner and Rangepartitioner on initialization invokes a method called sketch() that invokes collect() triggering a Job. My question: Is it expected that sortByKey will invoke a Job...if yes, why is sortByKey listed as a transformation and not action. Are there any other functions like this that invoke a Job, though they are transformations and not actions? I am on Spark 1.6 Thanking You - Praveen Devarao Spark Technology Centre IBM India Software Labs - "Courage doesn't always roar. Sometimes courage is the quiet voice at the end of the day saying I will try again"
Re: Do transformation functions on RDD invoke a Job [sc.runJob]?
Usually no - but sortByKey does because it needs the range boundary to be built in order to have the RDD. It is a long standing problem that's unfortunately very difficult to solve without breaking the RDD API. In DataFrame/Dataset we don't have this issue though. On Sun, Apr 24, 2016 at 10:54 PM, Praveen Devaraowrote: > Hi, > > I have a streaming program with the block as below [ref: > https://github.com/agsachin/streamingBenchmark/blob/master/spark-benchmarks/src/main/scala/TwitterStreaming.scala > ] > > *1 val **lines *= *messages*.map(_._2) > *2 val **hashTags *= *lines*.flatMap(status => status.split(*" "* > ).filter(_.startsWith(*"#"*))) > > *3 val **topCounts60 *= *hashTags*.map((_, 1)).reduceByKey( _ + _ ) > *3a* .map { *case *(topic, count) => (count, topic) } > *3b* .transform(_.sortByKey(*false*)) > > *4a**topCounts60*.foreachRDD( rdd => { > *4b* *val *topList = rdd.take( 10 ) > }) > > This batch is triggering 2 jobs...one at line *3b**(sortByKey)* > and the other at *4b (rdd.take) *I agree that there is a Job triggered > on line 4b as take() is an action on RDD while as on line 3b sortByKey is > just a transformation function which as per docs is lazy evaluation...but I > see that this line uses a RangePartitioner and Rangepartitioner on > initialization invokes a method called *sketch() *that invokes *collect()* > triggering a Job. > > My question: Is it expected that sortByKey will invoke a Job...if > yes, why is sortByKey listed as a transformation and not action. Are there > any other functions like this that invoke a Job, though they are > transformations and not actions? > > I am on Spark 1.6 > > Thanking You > > - > Praveen Devarao > Spark Technology Centre > IBM India Software Labs > > - > "Courage doesn't always roar. Sometimes courage is the quiet voice at the > end of the day saying I will try again" >