Good to know it worked. In case some of the job still failed can indicate
skew in your dataset. You may want to think of a partition by function.

Also, do you still see containers killed by yarn? If so, at what point? You
should see something like your app is trying to use x gb while yarn can
provide only y gb. You have option to go higher on executor memory little
more, maybe till 18G with 2G overhead. Finally you may want to tweak memory
fraction settings a little to see if you can salvage failed jobs.

Best
Ayan


On Mon, 17 Apr 2017 at 5:45 am, Patrick McCarthy <pmccar...@dstillery.com>
wrote:

> The partitions helped!
>
> I added repartition() and my function looks like this now:
>
> feature_df = (alldat_idx
>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>         .groupBy('id','label')
>
> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>         .repartition(1000)
>         .withColumn('num_feat',lit(feature_vec_len))
>
> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>         .drop('collect_list_is')
>         .drop('num_feat'))
>
> I got a few failed containers for memory overflow, but the job was able to
> finish successfully. I tried upping the repartition as high as 4000 but a
> few still failed.
>
> For posterity's sake, where would I look for the footprint you have in
> mind? On the executor tab?
>
> Since the audience part of the task finished successfully and the failure
> was on a df that didn't touch it, it shouldn't've made a difference.
>
> Thank you!
>
> On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> What i missed is try increasing number of partitions using repartition
>>
>> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> It does not look like scala vs python thing. How big is your audience
>>> data store? Can it be broadcasted?
>>>
>>> What is the memory footprint you are seeing? At what point yarn is
>>> killing? Depeneding on that you may want to tweak around number of
>>> partitions of input dataset and increase number of executors
>>>
>>> Ayan
>>>
>>>
>>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy <
>>> pmccar...@dstillery.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm trying to build an ETL job which takes in 30-100gb of text data and
>>>> prepares it for SparkML. I don't speak Scala so I've been trying to
>>>> implement in PySpark on YARN, Spark 2.1.
>>>>
>>>> Despite the transformations being fairly simple, the job always fails
>>>> by running out of executor memory.
>>>>
>>>> The input table is long (~6bn rows) but composed of three simple values:
>>>>
>>>> #####################################################################
>>>> all_data_long.printSchema()
>>>>
>>>> root
>>>> |-- id: long (nullable = true)
>>>> |-- label: short (nullable = true)
>>>> |-- segment: string (nullable = true)
>>>>
>>>> #####################################################################
>>>>
>>>> First I join it to a table of particular segments of interests and do
>>>> an aggregation,
>>>>
>>>> #####################################################################
>>>>
>>>> audiences.printSchema()
>>>>
>>>> root
>>>>  |-- entry: integer (nullable = true)
>>>>  |-- descr: string (nullable = true)
>>>>
>>>>
>>>> print("Num in adl: {}".format(str(all_data_long.count())))
>>>>
>>>> aud_str = audiences.select(audiences['entry'].cast('string'),
>>>>         audiences['descr'])
>>>>
>>>> alldata_aud = all_data_long.join(aud_str,
>>>>         all_data_long['segment']==aud_str['entry'],
>>>>         'left_outer')
>>>>
>>>> str_idx  = StringIndexer(inputCol='segment',outputCol='indexedSegs')
>>>>
>>>> idx_df   = str_idx.fit(alldata_aud)
>>>> label_df =
>>>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val')
>>>>
>>>> id_seg = (label_df
>>>>         .filter(label_df.descr.isNotNull())
>>>>         .groupBy('id')
>>>>         .agg(collect_list('descr')))
>>>>
>>>> id_seg.write.saveAsTable("hive.id_seg")
>>>>
>>>> #####################################################################
>>>>
>>>> Then, I use that StringIndexer again on the first data frame to
>>>> featurize the segment ID
>>>>
>>>> #####################################################################
>>>>
>>>> alldat_idx =
>>>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val')
>>>>
>>>> #####################################################################
>>>>
>>>>
>>>> My ultimate goal is to make a SparseVector, so I group the indexed
>>>> segments by id and try to cast it into a vector
>>>>
>>>> #####################################################################
>>>>
>>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen,
>>>> {v:1.0 for v in l}),VectorUDT())
>>>>
>>>> alldat_idx.cache()
>>>>
>>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1)
>>>>
>>>> print("alldat_dix: {}".format(str(alldat_idx.count())))
>>>>
>>>> feature_df = (alldat_idx
>>>>         .withColumn('label',alldat_idx['label_val'].cast('double'))
>>>>         .groupBy('id','label')
>>>>
>>>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is'))
>>>>         .withColumn('num_feat',lit(feature_vec_len))
>>>>
>>>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat'))
>>>>         .drop('collect_list_is')
>>>>         .drop('num_feat'))
>>>>
>>>> feature_df.cache()
>>>> print("Num in featuredf: {}".format(str(feature_df.count())))  ## <-
>>>> failure occurs here
>>>>
>>>> #####################################################################
>>>>
>>>> Here, however, I always run out of memory on the executors (I've
>>>> twiddled driver and executor memory to check) and YARN kills off my
>>>> containers. I've gone as high as —executor-memory 15g but it still doesn't
>>>> help.
>>>>
>>>> Given the number of segments is at most 50,000 I'm surprised that a
>>>> smallish row-wise operation is enough to blow up the process.
>>>>
>>>>
>>>> Is it really the UDF that's killing me? Do I have to rewrite it in
>>>> Scala?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Query plans for the failing stage:
>>>>
>>>> #####################################################################
>>>>
>>>>
>>>> == Parsed Logical Plan ==
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project [id#0L, label#183, features#208]
>>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>>> num_feat#202]
>>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS 
>>>> collect_list_is#197]
>>>>                +- Project [id#0L, label_val#99, segment#2,
>>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>>                   +- Project [id#0L, label#1 AS label_val#99,
>>>> segment#2, indexedSegs#93]
>>>>                      +- Project [id#0L, label#1, segment#2,
>>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Analyzed Logical Plan ==
>>>> count: bigint
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project [id#0L, label#183, features#208]
>>>>    +- Project [id#0L, label#183, num_feat#202, features#208]
>>>>       +- Project [id#0L, label#183, collect_list_is#197, num_feat#202,
>>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208]
>>>>          +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS
>>>> num_feat#202]
>>>>             +- Aggregate [id#0L, label#183], [id#0L, label#183,
>>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS 
>>>> collect_list_is#197]
>>>>                +- Project [id#0L, label_val#99, segment#2,
>>>> indexedSegs#93, cast(label_val#99 as double) AS label#183]
>>>>                   +- Project [id#0L, label#1 AS label_val#99,
>>>> segment#2, indexedSegs#93]
>>>>                      +- Project [id#0L, label#1, segment#2,
>>>> UDF(cast(segment#2 as string)) AS indexedSegs#93]
>>>>                         +- MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Optimized Logical Plan ==
>>>> Aggregate [count(1) AS count#265L]
>>>> +- Project
>>>>    +- InMemoryRelation [id#0L, label#183, features#208], true, 10000,
>>>> StorageLevel(disk, memory, deserialized, 1 replicas)
>>>>          +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208]
>>>>             +- BatchEvalPython [<lambda>(collect_list_is#197,
>>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>>                +- SortAggregate(key=[id#0L, label#183],
>>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>>>> collect_list_is#197])
>>>>                   +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS
>>>> FIRST], false, 0
>>>>                      +- Exchange hashpartitioning(id#0L, label#183, 200)
>>>>                         +- *Project [id#0L, indexedSegs#93,
>>>> cast(label_val#99 as double) AS label#183]
>>>>                            +- InMemoryTableScan [id#0L, indexedSegs#93,
>>>> label_val#99]
>>>>                                  +- InMemoryRelation [id#0L,
>>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>>> memory, deserialized, 1 replicas)
>>>>                                        +- *Project [id#0L, label#1 AS
>>>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>>                                           +- HiveTableScan [id#0L,
>>>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>>
>>>> == Physical Plan ==
>>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L])
>>>> +- Exchange SinglePartition
>>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)],
>>>> output=[count#284L])
>>>>       +- InMemoryTableScan
>>>>             +- InMemoryRelation [id#0L, label#183, features#208], true,
>>>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
>>>>                   +- *Project [id#0L, label#183, pythonUDF0#244 AS
>>>> features#208]
>>>>                      +- BatchEvalPython [<lambda>(collect_list_is#197,
>>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244]
>>>>                         +- SortAggregate(key=[id#0L, label#183],
>>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183,
>>>> collect_list_is#197])
>>>>                            +- *Sort [id#0L ASC NULLS FIRST, label#183
>>>> ASC NULLS FIRST], false, 0
>>>>                               +- Exchange hashpartitioning(id#0L,
>>>> label#183, 200)
>>>>                                  +- *Project [id#0L, indexedSegs#93,
>>>> cast(label_val#99 as double) AS label#183]
>>>>                                     +- InMemoryTableScan [id#0L,
>>>> indexedSegs#93, label_val#99]
>>>>                                           +- InMemoryRelation [id#0L,
>>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk,
>>>> memory, deserialized, 1 replicas)
>>>>                                                 +- *Project [id#0L,
>>>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93]
>>>>                                                    +- HiveTableScan
>>>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long
>>>>
>>>>
>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
> --
Best Regards,
Ayan Guha

Reply via email to