(Most of this code is not relevant to the question and can be refactored too. The casts and null checks look unnecessary.)
You are unioning RDDs so you have a result with the sum of their partitions. The number of partitions is really a hint to Hadoop only so it is not even necessarily 3 x 1920. Try not specifying the partitions at the source, and instead trying repartition after union to reduce the number of partitions. On Sep 28, 2014 7:36 AM, "qinwei" <wei....@dewmobile.net> wrote: > Hi, everyone > I come across a problem with changing the patition number of the rdd, > my code is as below: > val rdd1 = sc.textFile(path1) > val rdd2 = sc.textFile(path2) > val rdd3 = sc.textFile(path3) > > val imeiList = parseParam(job.jobParams) > val broadcastVar = sc.broadcast(imeiList) > val structuredRDD1 = rdd1.map(line => { val trunks = > line.split("\t") > > if(trunks.length == 35){ > > (trunks(6).trim, trunks(7).trim, trunks(3).trim, > trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong) > > } > > }) > val structuredRDD2 = rdd2.map(line => { val trunks = > line.split("\t") > > if(trunks.length == 33){ > > (trunks(6).trim, trunks(8).trim, trunks(9).trim, > trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) > > } > > }) > val structuredRDD3 = rdd3.map(line => { val trunks = > line.split("\t") > > if(trunks.length == 33){ > > (trunks(6).trim, trunks(8).trim, trunks(9).trim, > trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong) > > } > }) > > val unionedRDD > = structuredRDD1.union(structuredRDD2).union(structuredRDD3) > val resRDD = unionedRDD.filter(arg => arg != null && arg != ()) > .map(arg => arg.asInstanceOf[(String, String, String, String, String, > Long)]) .filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || > imeiFilter(arg._2, broadcastVar.value, 0)) > val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", > \"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" > + arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " + > arg._6.toString() + "}") > val jsonArray = jsonStrRDD.collect > > I noticed that there are 3834 tasks by default, and 3834 is the > number of files in path1 and path2 and path3, i want to change the > number of patition by the code below: > val rdd1 = sc.textFile(path1, 1920) > val rdd2 = sc.textFile(path2, 1920) > val rdd3 = sc.textFile(path3, 1920) > > by doing this, i expect there are 1920 tasks totally, but i found the > number of tasks becomes 8920, any idea what's going on here? > > Thanks! > > > ------------------------------ > qinwei >