Hi kdmxen,You want to delete the broadcast variables on the executors to avoid 
executors lost failure, right?Have you try to use the unpersist method? Like 
this way:itemSplitBroadcast.destroy(true); => 
itemSplitBroadcast.unpersist(true); 
LIN Chen

Date: Thu, 7 Jan 2016 22:01:27 +0800
Subject: How to split a huge rdd and broadcast it by turns?
From: kdm...@gmail.com
To: user@spark.apache.org

Description:
Our spark version is 1.4.1
we want to join two huge RDD, one of them with skew data. so the spark rdd 
operation join may lead memory problem. We try to split one of smaller one to 
several pieces then broadcast them in batches. On each broadcast turn, we try 
to collect one part of smaller rdd to driver, then save it to HashMap, then 
broadcast the HashMap. Each executor use the broadcast value to do map 
operation with the bigger rdd. We implement our skew data join through this way.
But when it process broadcast value in each turn. we find that we can not 
destroy our broadcast value after processing. if we use broadcast.destroy(), 
next turn we processing data willtrigger errors. like this:java.io.IOException: 
org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was 
destroyed (destroy at xxx.java:369)
we have viewed the source code of spark, and find this problem is leaded by rdd 
depedency relationship. if rdd3 -> rdd2 -> rdd1 (the arrow shows dependecy). 
and rdd1 is producted by using broadcast variable named b1, rdd2 used b2. when 
producing rdd3, the source code shows it need to serialize b1 and b2. if b1 or 
b2 is destroyed before rdd3 producing process. it will cause a failure which I 
list above.
Question:
Does it exist way can let rdd3 forget its depedency and make it don't require 
b1, b2, only required rdd2 during its producing process?
Or Does it exist way to deal with skew join problem?
by the way, we have set checkpoint for each turn. and set spark.cleaner.ttl to 
600. the problem are still there. if we dont destory broadcast variable, 
executor will lost in 5th turn.
our code is like this:
        for (int i = 0; i < times; i++) {               
JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;   
         List<Tuple2<String, Double>> itemSplit = itemZippedRdd                 
   .filter(new FilterByHashFunction(times, i))                    .collect();
            Map<String, Double> itemSplitMap = new HashMap<String, Double>();   
         for (Tuple2<String, Double> item : itemSplit) {                
itemSplitMap.put(item._1(), item._2());            }            
Broadcast<Map<String, Double>> itemSplitBroadcast = jsc                    
.broadcast(itemSplitMap);
            curItemPairRdd = prevItemPairRdd                    .mapToPair(new 
NormalizeScoreFunction(itemSplitBroadcast))                    
.persist(StorageLevel.DISK_ONLY());            curItemPairRdd.count();
            itemSplitBroadcast.destroy(true);            itemSplit.clear();
        }                                         

Reply via email to