Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop. Perhaps the
file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com>
wrote:

> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>    ...:     text = s[1]
>    ...:     the_id = s[0]
>    ...:     d = {}
>    ...:     l =  text.split("\n")
>    ...:     final = []
>    ...:     for line in l:
>    ...:         d[the_id] = line
>    ...:         final.append(Row(**d))
>    ...:     return final
>    ...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
>     ...:     text = s[1]
>     ...:     d = {}
>     ...:     l =  text.split("\n")
>     ...:     final = []
>     ...:     the_id = "init"
>     ...:     for line in l:
>     ...:         if line[0:15] == 'WARC-Record-ID:':
>     ...:             the_id = line[15:]
>     ...:         d[the_id] = line
>     ...:         final.append(Row(**d))
>     ...:     return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to