HI  all,

I have on issue about the text.

-sortMergeJoin[c1#41,c1#98]

what  does 41 and 98 stand for please.
thanks:)

---Original---
From: "Swapnil Shinde"<swapnilushi...@gmail.com>
Date: 2017/2/11 07:38:42
To: "Yong Zhang"<java8...@hotmail.com>;
Cc: "user@spark.apache.org"<user@spark.apache.org>;
Subject: Re: Spark's execution plan debugging


Thanks for your reply. I agree to your explanation of caching and seeing that 
it's working as expected. I am running given snippet on spark 2.0.1 and even 
with caching, I can see it's going back to dataframes a & b.&#xA0;

On Thu, Feb 9, 2017 at 3:41 PM, Yong Zhang <java8...@hotmail.com> wrote:
   
You may misunderstand what the cache mean. Caching a DF just means the data can 
be retrieved from the memory directly, instead of going to parent dependency to 
get the data. In your example, even the C is cached, but if you have 2 DFs 
derived out from it,  then the DF of C will be scanned 2 times in your 
application, but they are retrieved directly from the memory, instead of going 
to A/B DFs, which are the parent DFs that C is derived out from.
 

 
 
In the Spark execution plan, it can find out if any DFs in the chain being 
cached or not, then generate the right execution plan accordingly, as shown in 
following example (Tested with Spark 1.6.3). So as you can see, if the C is NOT 
cached, then your X  has to go to A/B (Scanning existing RDDs), but after C 
caches, Spark will get from "InMemoryColumnarTableScan".  But cache has nothing 
to do how many times the data will be scanned or not.&#xA0;
 

 
 

 scala> x.explain
 == Physical Plan ==
 SortMergeJoin [c1#41], [c1#98]
 :- SortMergeJoin [c1#41], [d1#45]
 : &#xA0;:- Sort [c1#41 ASC], false, 0
 : &#xA0;: &#xA0;+- TungstenExchange hashpartitioning(c1#41,200), None
 : &#xA0;: &#xA0; &#xA0; +- Project [_1#39 AS c1#41,_2#40 AS c2#42]
 : &#xA0;: &#xA0; &#xA0; &#xA0; &#xA0;+- Filter (_1#39 = a)
 : &#xA0;: &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; +- Scan 
ExistingRDD[_1#39,_2#40]&#xA0;
 : &#xA0;+- Sort [d1#45 ASC], false, 0
 : &#xA0; &#xA0; +- TungstenExchange hashpartitioning(d1#45,200), None
 : &#xA0; &#xA0; &#xA0; &#xA0;+- Project [_1#43 AS d1#45,_2#44 AS d2#46]
 : &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; +- Scan ExistingRDD[_1#43,_2#44]&#xA0;
 +- SortMergeJoin [c1#98], [d1#102]
 &#xA0; &#xA0;:- Sort [c1#98 ASC], false, 0
 &#xA0; &#xA0;: &#xA0;+- TungstenExchange hashpartitioning(c1#98,200), None
 &#xA0; &#xA0;: &#xA0; &#xA0; +- Project [_1#39 AS c1#98,_2#40 AS c2#99]
 &#xA0; &#xA0;: &#xA0; &#xA0; &#xA0; &#xA0;+- Scan 
ExistingRDD[_1#39,_2#40]&#xA0;
 &#xA0; &#xA0;+- Sort [d1#102 ASC], false, 0
 &#xA0; &#xA0; &#xA0; +- TungstenExchange hashpartitioning(d1#102,200), None
 &#xA0; &#xA0; &#xA0; &#xA0; &#xA0;+- Project [_1#43 AS d1#102,_2#44 AS d2#103]
 &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; +- Filter (_1#43 = b)
 &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; &#xA0; &#xA0;+- Scan 
ExistingRDD[_1#43,_2#44]
 
 
 scala> c.cache
 res17: c.type = [c1: string, c2: int, d1: string, d2: int]
 
 
 scala> x.explain
 == Physical Plan ==
 SortMergeJoin [c1#41], [c1#98]
 :- Filter (c1#41 = a)
 : &#xA0;+- InMemoryColumnarTableScan [c1#41,c2#42,d1#45,d2#46], [(c1#41 = a)], 
InMemoryRelation [c1#41,c2#42,d1#45,d2#46], true, 10000, StorageLevel(true, 
true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None
 +- Sort [c1#98 ASC], false, 0
 &#xA0; &#xA0;+- TungstenExchange hashpartitioning(c1#98,200), None
 &#xA0; &#xA0; &#xA0; +- Filter (d1#102 = b)
 &#xA0; &#xA0; &#xA0; &#xA0; &#xA0;+- InMemoryColumnarTableScan 
[c1#98,c2#99,d1#102,d2#103], [(d1#102 = b)], InMemoryRelation 
[c1#98,c2#99,d1#102,d2#103], true, 10000, StorageLevel(true, true, false, true, 
1), SortMergeJoin [c1#41], [d1#45], None
 
 
 
 

 
 
  
 From: Swapnil Shinde <swapnilushi...@gmail.com>
 Sent: Thursday, February 9, 2017 2:53 PM
 To: user@spark.apache.org
 Subject: Re: Spark's execution plan debugging &#xA0;
 
  Any suggestions, please..
 
 On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde  <swapnilushi...@gmail.com> 
wrote:
  Hello &#xA0; &#xA0; &#xA0; &#xA0;I am trying to figure out how spark 
generates its execution plan with and without caching. I have this small 
example to illustrate what I am doing-
 
 
  val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
 val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
 val c = a.join(b, $"c1" === $"d1")
 val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 = 
x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
 val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 = 
x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
 val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1")
 
 
 
 generic DAG for dataframe 'x' would be something like this- (Fig1)
 &#xA0; &#xA0; &#xA0; &#xA0;&#xA0;????????
 
 
 Obviously, physical plan (x.explain) generates something like this - (Without 
any caching)
 ????????
 
 
 
 I am interpreting this as -&#xA0;
 ????????
 
 
 
 
 
 As per my understanding, dataframe C is being used twice so it will be good to 
cache to it. I am hoping if I cache 'c' then execution plan will look like 
generic (explained above in fig1). However, I dont see it that way. Correct me 
if my understanding  is wrong in interpreting plan- (Here c is cached)
 ????????
 
 
 
 I don't think caching 'c' is helping anyway. Basically, input dataframes 'a' & 
'b' are being fetched twice. (In this example a,b are dataframes generated from 
local collection but real world has large files)
 
 
 Question:
 &#xA0; &#xA0; Why caching 'c' doesn't build physical plan where 'a' & 'b' were 
fetched only once. Then 'c' is generated and then d1, d2 built in parallel and 
provides input for x. (like fig1)
 &#xA0;I understand I am missing something very basic in understanding 
execution plans so please correct me if I am wrong anywhere.
 &#xA0; &#xA0;&#xA0;
 
 
 Thanks
  Swapnil

Reply via email to