Re: large scheduler delay in pyspark

2015-08-05 Thread gen tang
Hi,
Thanks a lot for your reply.


It seems that it is because of the slowness of the second code.
I rewrite code as list(set([i.items for i in a] + [i.items for i in b])).
The program returns normal.

By the way, I find that when the computation is running, UI will show
scheduler delay. However, it is not scheduler delay. When computation
finishes, UI will show correct scheduler delay time.

Cheers
Gen


On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote:

 On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
  Hi,
 
  Recently, I met some problems about scheduler delay in pyspark. I worked
  several days on this problem, but not success. Therefore, I come to here
 to
  ask for help.
 
  I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to
 merge
  value by adding two list
 
  if I do reduceByKey as follows:
 rdd.reduceByKey(lambda a, b: a+b)
  It works fine, scheduler delay is less than 10s. However if I do
  reduceByKey:
 def f(a, b):
 for i in b:
  if i not in a:
 a.append(i)
 return a
rdd.reduceByKey(f)

 Is it possible that you have large object that is also named `i` or `a` or
 `b`?

 Btw, the second one could be slow than first one, because you try to lookup
 a object in a list, that is O(N), especially when the object is large
 (dict).

  It will cause very large scheduler delay, about 15-20 mins.(The data I
 deal
  with is about 300 mb, and I use 5 machine with 32GB memory)

 If you see scheduler delay, it means there may be a large broadcast
 involved.

  I know the second code is not the same as the first. In fact, my purpose
 is
  to implement the second, but not work. So I try the first one.
  I don't know whether this is related to the data(with long string) or
 Spark
  on Yarn. But the first code works fine on the same data.
 
  Is there any way to find out the log when spark stall in scheduler delay,
  please? Or any ideas about this problem?
 
  Thanks a lot in advance for your help.
 
  Cheers
  Gen
 
 



Re: large scheduler delay in pyspark

2015-08-05 Thread ayan guha
It seems you want to dedupe your data after the merge so set(a+b) should
also work..you may ditch the list comprehensiion operation.
On 5 Aug 2015 23:55, gen tang gen.tan...@gmail.com wrote:

 Hi,
 Thanks a lot for your reply.


 It seems that it is because of the slowness of the second code.
 I rewrite code as list(set([i.items for i in a] + [i.items for i in b])).
 The program returns normal.

 By the way, I find that when the computation is running, UI will show
 scheduler delay. However, it is not scheduler delay. When computation
 finishes, UI will show correct scheduler delay time.

 Cheers
 Gen


 On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote:

 On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
  Hi,
 
  Recently, I met some problems about scheduler delay in pyspark. I worked
  several days on this problem, but not success. Therefore, I come to
 here to
  ask for help.
 
  I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to
 merge
  value by adding two list
 
  if I do reduceByKey as follows:
 rdd.reduceByKey(lambda a, b: a+b)
  It works fine, scheduler delay is less than 10s. However if I do
  reduceByKey:
 def f(a, b):
 for i in b:
  if i not in a:
 a.append(i)
 return a
rdd.reduceByKey(f)

 Is it possible that you have large object that is also named `i` or `a`
 or `b`?

 Btw, the second one could be slow than first one, because you try to
 lookup
 a object in a list, that is O(N), especially when the object is large
 (dict).

  It will cause very large scheduler delay, about 15-20 mins.(The data I
 deal
  with is about 300 mb, and I use 5 machine with 32GB memory)

 If you see scheduler delay, it means there may be a large broadcast
 involved.

  I know the second code is not the same as the first. In fact, my
 purpose is
  to implement the second, but not work. So I try the first one.
  I don't know whether this is related to the data(with long string) or
 Spark
  on Yarn. But the first code works fine on the same data.
 
  Is there any way to find out the log when spark stall in scheduler
 delay,
  please? Or any ideas about this problem?
 
  Thanks a lot in advance for your help.
 
  Cheers
  Gen
 
 





Re: large scheduler delay in pyspark

2015-08-04 Thread Davies Liu
On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote:
 Hi,

 Recently, I met some problems about scheduler delay in pyspark. I worked
 several days on this problem, but not success. Therefore, I come to here to
 ask for help.

 I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge
 value by adding two list

 if I do reduceByKey as follows:
rdd.reduceByKey(lambda a, b: a+b)
 It works fine, scheduler delay is less than 10s. However if I do
 reduceByKey:
def f(a, b):
for i in b:
 if i not in a:
a.append(i)
return a
   rdd.reduceByKey(f)

Is it possible that you have large object that is also named `i` or `a` or `b`?

Btw, the second one could be slow than first one, because you try to lookup
a object in a list, that is O(N), especially when the object is large (dict).

 It will cause very large scheduler delay, about 15-20 mins.(The data I deal
 with is about 300 mb, and I use 5 machine with 32GB memory)

If you see scheduler delay, it means there may be a large broadcast involved.

 I know the second code is not the same as the first. In fact, my purpose is
 to implement the second, but not work. So I try the first one.
 I don't know whether this is related to the data(with long string) or Spark
 on Yarn. But the first code works fine on the same data.

 Is there any way to find out the log when spark stall in scheduler delay,
 please? Or any ideas about this problem?

 Thanks a lot in advance for your help.

 Cheers
 Gen



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



large scheduler delay in pyspark

2015-08-03 Thread gen tang
Hi,

Recently, I met some problems about scheduler delay in pyspark. I worked
several days on this problem, but not success. Therefore, I come to here to
ask for help.

I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to
merge value by adding two list

if I do reduceByKey as follows:
   rdd.reduceByKey(lambda a, b: a+b)
It works fine, scheduler delay is less than 10s. However if I do
reduceByKey:
   def f(a, b):
   for i in b:
if i not in a:
   a.append(i)
   return a
  rdd.reduceByKey(f)
It will cause very large scheduler delay, about 15-20 mins.(The data I deal
with is about 300 mb, and I use 5 machine with 32GB memory)

I know the second code is not the same as the first. In fact, my purpose is
to implement the second, but not work. So I try the first one.
I don't know whether this is related to the data(with long string) or Spark
on Yarn. But the first code works fine on the same data.

Is there any way to find out the log when spark stall in scheduler delay,
please? Or any ideas about this problem?

Thanks a lot in advance for your help.

Cheers
Gen