Good to know it worked. In case some of the job still failed can indicate skew in your dataset. You may want to think of a partition by function.
Also, do you still see containers killed by yarn? If so, at what point? You should see something like your app is trying to use x gb while yarn can provide only y gb. You have option to go higher on executor memory little more, maybe till 18G with 2G overhead. Finally you may want to tweak memory fraction settings a little to see if you can salvage failed jobs. Best Ayan On Mon, 17 Apr 2017 at 5:45 am, Patrick McCarthy <pmccar...@dstillery.com> wrote: > The partitions helped! > > I added repartition() and my function looks like this now: > > feature_df = (alldat_idx > .withColumn('label',alldat_idx['label_val'].cast('double')) > .groupBy('id','label') > > .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is')) > .repartition(1000) > .withColumn('num_feat',lit(feature_vec_len)) > > .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat')) > .drop('collect_list_is') > .drop('num_feat')) > > I got a few failed containers for memory overflow, but the job was able to > finish successfully. I tried upping the repartition as high as 4000 but a > few still failed. > > For posterity's sake, where would I look for the footprint you have in > mind? On the executor tab? > > Since the audience part of the task finished successfully and the failure > was on a df that didn't touch it, it shouldn't've made a difference. > > Thank you! > > On Sat, Apr 15, 2017 at 9:07 PM, ayan guha <guha.a...@gmail.com> wrote: > >> What i missed is try increasing number of partitions using repartition >> >> On Sun, 16 Apr 2017 at 11:06 am, ayan guha <guha.a...@gmail.com> wrote: >> >>> It does not look like scala vs python thing. How big is your audience >>> data store? Can it be broadcasted? >>> >>> What is the memory footprint you are seeing? At what point yarn is >>> killing? Depeneding on that you may want to tweak around number of >>> partitions of input dataset and increase number of executors >>> >>> Ayan >>> >>> >>> On Sat, 15 Apr 2017 at 2:10 am, Patrick McCarthy < >>> pmccar...@dstillery.com> wrote: >>> >>>> Hello, >>>> >>>> I'm trying to build an ETL job which takes in 30-100gb of text data and >>>> prepares it for SparkML. I don't speak Scala so I've been trying to >>>> implement in PySpark on YARN, Spark 2.1. >>>> >>>> Despite the transformations being fairly simple, the job always fails >>>> by running out of executor memory. >>>> >>>> The input table is long (~6bn rows) but composed of three simple values: >>>> >>>> ##################################################################### >>>> all_data_long.printSchema() >>>> >>>> root >>>> |-- id: long (nullable = true) >>>> |-- label: short (nullable = true) >>>> |-- segment: string (nullable = true) >>>> >>>> ##################################################################### >>>> >>>> First I join it to a table of particular segments of interests and do >>>> an aggregation, >>>> >>>> ##################################################################### >>>> >>>> audiences.printSchema() >>>> >>>> root >>>> |-- entry: integer (nullable = true) >>>> |-- descr: string (nullable = true) >>>> >>>> >>>> print("Num in adl: {}".format(str(all_data_long.count()))) >>>> >>>> aud_str = audiences.select(audiences['entry'].cast('string'), >>>> audiences['descr']) >>>> >>>> alldata_aud = all_data_long.join(aud_str, >>>> all_data_long['segment']==aud_str['entry'], >>>> 'left_outer') >>>> >>>> str_idx = StringIndexer(inputCol='segment',outputCol='indexedSegs') >>>> >>>> idx_df = str_idx.fit(alldata_aud) >>>> label_df = >>>> idx_df.transform(alldata_aud).withColumnRenamed('label','label_val') >>>> >>>> id_seg = (label_df >>>> .filter(label_df.descr.isNotNull()) >>>> .groupBy('id') >>>> .agg(collect_list('descr'))) >>>> >>>> id_seg.write.saveAsTable("hive.id_seg") >>>> >>>> ##################################################################### >>>> >>>> Then, I use that StringIndexer again on the first data frame to >>>> featurize the segment ID >>>> >>>> ##################################################################### >>>> >>>> alldat_idx = >>>> idx_df.transform(all_data_long).withColumnRenamed('label','label_val') >>>> >>>> ##################################################################### >>>> >>>> >>>> My ultimate goal is to make a SparseVector, so I group the indexed >>>> segments by id and try to cast it into a vector >>>> >>>> ##################################################################### >>>> >>>> list_to_sparse_udf = udf(lambda l, maxlen: Vectors.sparse(maxlen, >>>> {v:1.0 for v in l}),VectorUDT()) >>>> >>>> alldat_idx.cache() >>>> >>>> feature_vec_len = (alldat_idx.select(max('indexedSegs')).first()[0] + 1) >>>> >>>> print("alldat_dix: {}".format(str(alldat_idx.count()))) >>>> >>>> feature_df = (alldat_idx >>>> .withColumn('label',alldat_idx['label_val'].cast('double')) >>>> .groupBy('id','label') >>>> >>>> .agg(sort_array(collect_list('indexedSegs')).alias('collect_list_is')) >>>> .withColumn('num_feat',lit(feature_vec_len)) >>>> >>>> .withColumn('features',list_to_sparse_udf('collect_list_is','num_feat')) >>>> .drop('collect_list_is') >>>> .drop('num_feat')) >>>> >>>> feature_df.cache() >>>> print("Num in featuredf: {}".format(str(feature_df.count()))) ## <- >>>> failure occurs here >>>> >>>> ##################################################################### >>>> >>>> Here, however, I always run out of memory on the executors (I've >>>> twiddled driver and executor memory to check) and YARN kills off my >>>> containers. I've gone as high as —executor-memory 15g but it still doesn't >>>> help. >>>> >>>> Given the number of segments is at most 50,000 I'm surprised that a >>>> smallish row-wise operation is enough to blow up the process. >>>> >>>> >>>> Is it really the UDF that's killing me? Do I have to rewrite it in >>>> Scala? >>>> >>>> >>>> >>>> >>>> >>>> Query plans for the failing stage: >>>> >>>> ##################################################################### >>>> >>>> >>>> == Parsed Logical Plan == >>>> Aggregate [count(1) AS count#265L] >>>> +- Project [id#0L, label#183, features#208] >>>> +- Project [id#0L, label#183, num_feat#202, features#208] >>>> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >>>> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >>>> num_feat#202] >>>> +- Aggregate [id#0L, label#183], [id#0L, label#183, >>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS >>>> collect_list_is#197] >>>> +- Project [id#0L, label_val#99, segment#2, >>>> indexedSegs#93, cast(label_val#99 as double) AS label#183] >>>> +- Project [id#0L, label#1 AS label_val#99, >>>> segment#2, indexedSegs#93] >>>> +- Project [id#0L, label#1, segment#2, >>>> UDF(cast(segment#2 as string)) AS indexedSegs#93] >>>> +- MetastoreRelation pmccarthy, all_data_long >>>> >>>> == Analyzed Logical Plan == >>>> count: bigint >>>> Aggregate [count(1) AS count#265L] >>>> +- Project [id#0L, label#183, features#208] >>>> +- Project [id#0L, label#183, num_feat#202, features#208] >>>> +- Project [id#0L, label#183, collect_list_is#197, num_feat#202, >>>> <lambda>(collect_list_is#197, num_feat#202) AS features#208] >>>> +- Project [id#0L, label#183, collect_list_is#197, 56845.0 AS >>>> num_feat#202] >>>> +- Aggregate [id#0L, label#183], [id#0L, label#183, >>>> sort_array(collect_list(indexedSegs#93, 0, 0), true) AS >>>> collect_list_is#197] >>>> +- Project [id#0L, label_val#99, segment#2, >>>> indexedSegs#93, cast(label_val#99 as double) AS label#183] >>>> +- Project [id#0L, label#1 AS label_val#99, >>>> segment#2, indexedSegs#93] >>>> +- Project [id#0L, label#1, segment#2, >>>> UDF(cast(segment#2 as string)) AS indexedSegs#93] >>>> +- MetastoreRelation pmccarthy, all_data_long >>>> >>>> == Optimized Logical Plan == >>>> Aggregate [count(1) AS count#265L] >>>> +- Project >>>> +- InMemoryRelation [id#0L, label#183, features#208], true, 10000, >>>> StorageLevel(disk, memory, deserialized, 1 replicas) >>>> +- *Project [id#0L, label#183, pythonUDF0#244 AS features#208] >>>> +- BatchEvalPython [<lambda>(collect_list_is#197, >>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >>>> +- SortAggregate(key=[id#0L, label#183], >>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183, >>>> collect_list_is#197]) >>>> +- *Sort [id#0L ASC NULLS FIRST, label#183 ASC NULLS >>>> FIRST], false, 0 >>>> +- Exchange hashpartitioning(id#0L, label#183, 200) >>>> +- *Project [id#0L, indexedSegs#93, >>>> cast(label_val#99 as double) AS label#183] >>>> +- InMemoryTableScan [id#0L, indexedSegs#93, >>>> label_val#99] >>>> +- InMemoryRelation [id#0L, >>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >>>> memory, deserialized, 1 replicas) >>>> +- *Project [id#0L, label#1 AS >>>> label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >>>> +- HiveTableScan [id#0L, >>>> label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >>>> >>>> == Physical Plan == >>>> *HashAggregate(keys=[], functions=[count(1)], output=[count#265L]) >>>> +- Exchange SinglePartition >>>> +- *HashAggregate(keys=[], functions=[partial_count(1)], >>>> output=[count#284L]) >>>> +- InMemoryTableScan >>>> +- InMemoryRelation [id#0L, label#183, features#208], true, >>>> 10000, StorageLevel(disk, memory, deserialized, 1 replicas) >>>> +- *Project [id#0L, label#183, pythonUDF0#244 AS >>>> features#208] >>>> +- BatchEvalPython [<lambda>(collect_list_is#197, >>>> 56845.0)], [id#0L, label#183, collect_list_is#197, pythonUDF0#244] >>>> +- SortAggregate(key=[id#0L, label#183], >>>> functions=[collect_list(indexedSegs#93, 0, 0)], output=[id#0L, label#183, >>>> collect_list_is#197]) >>>> +- *Sort [id#0L ASC NULLS FIRST, label#183 >>>> ASC NULLS FIRST], false, 0 >>>> +- Exchange hashpartitioning(id#0L, >>>> label#183, 200) >>>> +- *Project [id#0L, indexedSegs#93, >>>> cast(label_val#99 as double) AS label#183] >>>> +- InMemoryTableScan [id#0L, >>>> indexedSegs#93, label_val#99] >>>> +- InMemoryRelation [id#0L, >>>> label_val#99, segment#2, indexedSegs#93], true, 10000, StorageLevel(disk, >>>> memory, deserialized, 1 replicas) >>>> +- *Project [id#0L, >>>> label#1 AS label_val#99, segment#2, UDF(segment#2) AS indexedSegs#93] >>>> +- HiveTableScan >>>> [id#0L, label#1, segment#2], MetastoreRelation pmccarthy, all_data_long >>>> >>>> >>>> -- >>> Best Regards, >>> Ayan Guha >>> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha