Hi, everyone    I come across a problem with changing the patition number of 
the rdd,  my code is as below:    val rdd1 = sc.textFile(path1)     val rdd2 = 
sc.textFile(path2)

    val rdd3 = sc.textFile(path3)



    val imeiList = parseParam(job.jobParams)

    val broadcastVar = sc.broadcast(imeiList)

    val structuredRDD1 = rdd1.map(line => {           val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 35){

                                                                                
            (trunks(6).trim, trunks(7).trim, trunks(3).trim, trunks(5).trim, 
trunks(12).trim, trunks(13).trim.toLong)

                                                                                
            }

                                                                                
})

    val structuredRDD2 = rdd2.map(line => {           val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 33){

                                                                                
            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, 
trunks(12).trim, trunks(3).trim.toLong)

                                                                                
                }

                                                                                
})

    val structuredRDD3 = rdd3.map(line => {          val trunks = 
line.split("\t")

                                                                                
    if(trunks.length == 33){

                                                                                
            (trunks(6).trim, trunks(8).trim, trunks(9).trim, trunks(14).trim, 
trunks(12).trim, trunks(3).trim.toLong)

                                                                                
               }

                                                                        })

    val unionedRDD = structuredRDD1.union(structuredRDD2).union(structuredRDD3)

    val resRDD = unionedRDD.filter(arg => arg != null && arg != ())
.map(arg => arg.asInstanceOf[(String, String, String, String, String, Long)])
.filter(arg => imeiFilter(arg._1, broadcastVar.value, 0) || imeiFilter(arg._2, 
broadcastVar.value, 0))
    val jsonStrRDD = resRDD.map(arg => "{\"f_imei\" : \"" + arg._1 + "\", 
\"t_imei\" : \"" + arg._2 + "\", \"dgst\" : \"" + arg._3 + "\", \"n\" : \"" + 
arg._4 + "\", \"s\" : " + arg._5.toString() + ", \"ts\" : " + arg._6.toString() 
+ "}")

    val jsonArray = jsonStrRDD.collect
    I noticed that there are 3834 tasks by default,  and 3834 is the number of 
files in path1 and path2 and path3,  i want to change the number of patition by 
the code below:    val rdd1 = sc.textFile(path1, 1920) 
    val rdd2 = sc.textFile(path2, 1920) 
    val rdd3 = sc.textFile(path3, 1920)
    by doing this, i expect there are 1920 tasks totally, but i found the 
number of tasks becomes 8920, any idea what's going on here?
    Thanks!



qinwei

Reply via email to