You can use sc.wholeTextFiles to read a directory of text file.

Also, it seems from your code that you are only interested in the current
year's count, you can perform a filter before distinct() and perform a
reduce to sum up counts.

Hope this helps!
Liquan


On Tue, Sep 30, 2014 at 1:59 PM, SK <skrishna...@gmail.com> wrote:

> Hi,
>
> I am  trying to compute the number of unique users from a year's worth of
> data. So there are about 300 files and each file is quite large (~GB).  I
> first tried this without a loop by reading all the files in the directory
> using the glob pattern:  sc.textFile("dir/*"). But the tasks were stalling
> and I was getting a "Too many open files" warning, even though I increased
> the nofile limit to 500K.  The number of shuffle tasks that were being
> created was more than 200K and they were all generating shuffle files.
> Setting consolidateFiles to true did not help.
>
> So now I am reading the files in a loop as shown in the  code below. Now I
> dont run in to the "Too many open files" issue.  But the countByKey is
> taking a really long time (more then 15 hours and still ongoing). It
> appears
> from the logs that this operation is happening on a single node. From the
> logs, I am not able to figure out why it is taking so long. Each node has
> 16
> GB memory and the mesos cluster has 16 nodes.  I have set  spark.serializer
> to KryoSerializer.  I am not running into any out of memory errors so far.
> Is there some way to improve the performance? Thanks.
>
> for (i <- 1 to 300)
> {
>          var f = "file" + i    //name of the file
>          val user_time = sc.textFile(f)
>                         .map(line => {
>                              val fields = line.split("\t")
>                              (fields(11), fields(6))
>                             }) // extract (year-month, user_id)
>                         .distinct()
>                         .countByKey  // group by with year as the key
>
>         // now convert Map object to RDD in order to output results
>         val ut_rdd = sc.parallelize(user_time.toSeq)
>
>         // convert to array to extract the count. Need to find if there is
> an easier way to do this.
>         var ar = ut_rdd.toArray()
>
>         // aggregate the count for the year
>         ucount = ucount + ar(0)._2
> }
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/processing-large-number-of-files-tp15429.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst

Reply via email to