You can use sc.wholeTextFiles to read a directory of text file. Also, it seems from your code that you are only interested in the current year's count, you can perform a filter before distinct() and perform a reduce to sum up counts.
Hope this helps! Liquan On Tue, Sep 30, 2014 at 1:59 PM, SK <skrishna...@gmail.com> wrote: > Hi, > > I am trying to compute the number of unique users from a year's worth of > data. So there are about 300 files and each file is quite large (~GB). I > first tried this without a loop by reading all the files in the directory > using the glob pattern: sc.textFile("dir/*"). But the tasks were stalling > and I was getting a "Too many open files" warning, even though I increased > the nofile limit to 500K. The number of shuffle tasks that were being > created was more than 200K and they were all generating shuffle files. > Setting consolidateFiles to true did not help. > > So now I am reading the files in a loop as shown in the code below. Now I > dont run in to the "Too many open files" issue. But the countByKey is > taking a really long time (more then 15 hours and still ongoing). It > appears > from the logs that this operation is happening on a single node. From the > logs, I am not able to figure out why it is taking so long. Each node has > 16 > GB memory and the mesos cluster has 16 nodes. I have set spark.serializer > to KryoSerializer. I am not running into any out of memory errors so far. > Is there some way to improve the performance? Thanks. > > for (i <- 1 to 300) > { > var f = "file" + i //name of the file > val user_time = sc.textFile(f) > .map(line => { > val fields = line.split("\t") > (fields(11), fields(6)) > }) // extract (year-month, user_id) > .distinct() > .countByKey // group by with year as the key > > // now convert Map object to RDD in order to output results > val ut_rdd = sc.parallelize(user_time.toSeq) > > // convert to array to extract the count. Need to find if there is > an easier way to do this. > var ar = ut_rdd.toArray() > > // aggregate the count for the year > ucount = ucount + ar(0)._2 > } > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- Liquan Pei Department of Physics University of Massachusetts Amherst