Re: large scheduler delay in pyspark
Hi, Thanks a lot for your reply. It seems that it is because of the slowness of the second code. I rewrite code as list(set([i.items for i in a] + [i.items for i in b])). The program returns normal. By the way, I find that when the computation is running, UI will show scheduler delay. However, it is not scheduler delay. When computation finishes, UI will show correct scheduler delay time. Cheers Gen On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote: On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote: Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) Is it possible that you have large object that is also named `i` or `a` or `b`? Btw, the second one could be slow than first one, because you try to lookup a object in a list, that is O(N), especially when the object is large (dict). It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) If you see scheduler delay, it means there may be a large broadcast involved. I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen
Re: large scheduler delay in pyspark
It seems you want to dedupe your data after the merge so set(a+b) should also work..you may ditch the list comprehensiion operation. On 5 Aug 2015 23:55, gen tang gen.tan...@gmail.com wrote: Hi, Thanks a lot for your reply. It seems that it is because of the slowness of the second code. I rewrite code as list(set([i.items for i in a] + [i.items for i in b])). The program returns normal. By the way, I find that when the computation is running, UI will show scheduler delay. However, it is not scheduler delay. When computation finishes, UI will show correct scheduler delay time. Cheers Gen On Tue, Aug 4, 2015 at 3:13 PM, Davies Liu dav...@databricks.com wrote: On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote: Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) Is it possible that you have large object that is also named `i` or `a` or `b`? Btw, the second one could be slow than first one, because you try to lookup a object in a list, that is O(N), especially when the object is large (dict). It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) If you see scheduler delay, it means there may be a large broadcast involved. I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen
Re: large scheduler delay in pyspark
On Mon, Aug 3, 2015 at 9:00 AM, gen tang gen.tan...@gmail.com wrote: Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) Is it possible that you have large object that is also named `i` or `a` or `b`? Btw, the second one could be slow than first one, because you try to lookup a object in a list, that is O(N), especially when the object is large (dict). It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) If you see scheduler delay, it means there may be a large broadcast involved. I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
large scheduler delay in pyspark
Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen