Hi kdmxen,You want to delete the broadcast variables on the executors to avoid executors lost failure, right?Have you try to use the unpersist method? Like this way:itemSplitBroadcast.destroy(true); => itemSplitBroadcast.unpersist(true); LIN Chen
Date: Thu, 7 Jan 2016 22:01:27 +0800 Subject: How to split a huge rdd and broadcast it by turns? From: kdm...@gmail.com To: user@spark.apache.org Description: Our spark version is 1.4.1 we want to join two huge RDD, one of them with skew data. so the spark rdd operation join may lead memory problem. We try to split one of smaller one to several pieces then broadcast them in batches. On each broadcast turn, we try to collect one part of smaller rdd to driver, then save it to HashMap, then broadcast the HashMap. Each executor use the broadcast value to do map operation with the bigger rdd. We implement our skew data join through this way. But when it process broadcast value in each turn. we find that we can not destroy our broadcast value after processing. if we use broadcast.destroy(), next turn we processing data willtrigger errors. like this:java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369) we have viewed the source code of spark, and find this problem is leaded by rdd depedency relationship. if rdd3 -> rdd2 -> rdd1 (the arrow shows dependecy). and rdd1 is producted by using broadcast variable named b1, rdd2 used b2. when producing rdd3, the source code shows it need to serialize b1 and b2. if b1 or b2 is destroyed before rdd3 producing process. it will cause a failure which I list above. Question: Does it exist way can let rdd3 forget its depedency and make it don't require b1, b2, only required rdd2 during its producing process? Or Does it exist way to deal with skew join problem? by the way, we have set checkpoint for each turn. and set spark.cleaner.ttl to 600. the problem are still there. if we dont destory broadcast variable, executor will lost in 5th turn. our code is like this: for (int i = 0; i < times; i++) { JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd; List<Tuple2<String, Double>> itemSplit = itemZippedRdd .filter(new FilterByHashFunction(times, i)) .collect(); Map<String, Double> itemSplitMap = new HashMap<String, Double>(); for (Tuple2<String, Double> item : itemSplit) { itemSplitMap.put(item._1(), item._2()); } Broadcast<Map<String, Double>> itemSplitBroadcast = jsc .broadcast(itemSplitMap); curItemPairRdd = prevItemPairRdd .mapToPair(new NormalizeScoreFunction(itemSplitBroadcast)) .persist(StorageLevel.DISK_ONLY()); curItemPairRdd.count(); itemSplitBroadcast.destroy(true); itemSplit.clear(); }