using wholeFiles to process formats that can not be split per line is not
"old"

and there are plenty of problems for which RDD is still better suited than
Dataset or DataFrame currently (this might change in near future when
Dataset gets some crucial optimizations fixed).

On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay <paulhtremb...@gmail.com>
> wrote:
>
>> The file is so small that a stand alone python script, independent of
>> spark, can process the file in under a second.
>>
>> Also, the following fails:
>>
>> 1. Read the whole file in with wholeFiles
>>
>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>> line="line")
>>
>> 3. Save the results as CVS to HDFS
>>
>> 4. Read the files (there are 20) from HDFS into a df using
>> sqlContext.read.csv(<path>)
>>
>> 5. Convert the df to an rdd.
>>
>> 6 Create key value pairs with the key being the file path and the value
>> being the line.
>>
>> 7 Iterate through values
>>
>> What happens is Spark either runs out of memory, or, in my last try with
>> a slight variation, just hangs for 12 hours.
>>
>> Henry
>>
>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>
>> Hi, Tremblay.
>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>> file is loaded by only one executor.
>> How many executors do you start?
>> Perhaps repartition method could solve it, I guess.
>>
>>
>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay <paulhtremb...@gmail.com>
>> wrote:
>>
>>> I am reading in a single small file from hadoop with wholeText. If I
>>> process each line and create a row with two cells, the first cell equal to
>>> the name of the file, the second cell equal to the line. That code runs
>>> fine.
>>>
>>> But if I just add two line of code and change the first cell based on
>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>> process that would succeed quickly in a non spark application fails?
>>>
>>> Thanks!
>>>
>>> Henry
>>>
>>> CODE:
>>>
>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>> nternal.warc.gz
>>>
>>>
>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>> In [2]: rdd1.count()
>>> Out[2]: 1
>>>
>>>
>>> In [4]: def process_file(s):
>>>    ...:     text = s[1]
>>>    ...:     the_id = s[0]
>>>    ...:     d = {}
>>>    ...:     l =  text.split("\n")
>>>    ...:     final = []
>>>    ...:     for line in l:
>>>    ...:         d[the_id] = line
>>>    ...:         final.append(Row(**d))
>>>    ...:     return final
>>>    ...:
>>>
>>> In [5]: rdd2 = rdd1.map(process_file)
>>>
>>> In [6]: rdd2.count()
>>> Out[6]: 1
>>>
>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>
>>> In [8]: rdd3.count()
>>> Out[8]: 508310
>>>
>>> In [9]: rdd3.take(1)
>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>> ternal.warc.gz='WARC/1.0\r')]
>>>
>>> In [10]: def process_file(s):
>>>     ...:     text = s[1]
>>>     ...:     d = {}
>>>     ...:     l =  text.split("\n")
>>>     ...:     final = []
>>>     ...:     the_id = "init"
>>>     ...:     for line in l:
>>>     ...:         if line[0:15] == 'WARC-Record-ID:':
>>>     ...:             the_id = line[15:]
>>>     ...:         d[the_id] = line
>>>     ...:         final.append(Row(**d))
>>>     ...:     return final
>>>
>>> In [12]: rdd2 = rdd1.map(process_file)
>>>
>>> In [13]: rdd2.count()
>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>> head.
>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>> 10.3 GB physical memory used. Consider boosting
>>> spark.yarn.executor.memoryOverhead.
>>>
>>>
>>> --
>>> Henry Tremblay
>>> Robert Half Technology
>>>
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> Henry Tremblay
>> Robert Half Technology
>>
>>
>

Reply via email to