(Most of this code is not relevant to the question and can be refactored
too. The casts and null checks look unnecessary.)

You are unioning RDDs so you have a result with the sum of their
partitions. The number of partitions is really a hint to Hadoop only so it
is not even necessarily 3 x 1920.

Try not specifying the partitions at the source, and instead trying
repartition after union to reduce the number of partitions.
On Sep 28, 2014 7:36 AM, "qinwei" <wei....@dewmobile.net> wrote:

> Hi, everyone
>     I come across a problem with changing the patition number of the rdd,
>  my code is as below:
>     val rdd1 = sc.textFile(path1)
>     val rdd2 = sc.textFile(path2)
>     val rdd3 = sc.textFile(path3)
>
>     val imeiList = parseParam(job.jobParams)
>     val broadcastVar = sc.broadcast(imeiList)
>     val structuredRDD1 = rdd1.map(line => {           val trunks =
> line.split("\t")
>
>           if(trunks.length == 35){
>
>                   (trunks(6).trim, trunks(7).trim, trunks(3).trim,
> trunks(5).trim, trunks(12).trim, trunks(13).trim.toLong)
>
>                   }
>
>       })
>     val structuredRDD2 = rdd2.map(line => {           val trunks =
> line.split("\t")
>
>           if(trunks.length == 33){
>
>                   (trunks(6).trim, trunks(8).trim, trunks(9).trim,
> trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)
>
>                       }
>
>       })
>     val structuredRDD3 = rdd3.map(line => {          val trunks =
> line.split("\t")
>
>           if(trunks.length == 33){
>
>                   (trunks(6).trim, trunks(8).trim, trunks(9).trim,
> trunks(14).trim, trunks(12).trim, trunks(3).trim.toLong)
>
>                      }
>                                                                         })
>
>     val unionedRDD
> = structuredRDD1.union(structuredRDD2).union(structuredRDD3)
>     val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
> .map(arg => arg.asInstanceOf[(String, String, String, String, String,
> Long)]) .filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) ||
> imeiFilter(arg._2, broadcastVar.value, 0))
>     val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\",
> \"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \""
> + arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " +
> arg._6.toString() + "}")
>     val jsonArray = jsonStrRDD.collect
>
>     I noticed that there are 3834 tasks by default,  and 3834 is the
> number of files in path1 and path2 and path3,  i want to change the
> number of patition by the code below:
>     val rdd1 = sc.textFile(path1, 1920)
>     val rdd2 = sc.textFile(path2, 1920)
>     val rdd3 = sc.textFile(path3, 1920)
>
>     by doing this, i expect there are 1920 tasks totally, but i found the
> number of tasks becomes 8920, any idea what's going on here?
>
>     Thanks!
>
>
> ------------------------------
> qinwei
>

Reply via email to