Hi Xiangrui, A side-by question about MLLib. It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support L2 regurization, the doc explains it: "The L1 regularization by using L1Updater <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater> will not work since the soft-thresholding logic in L1Updater is designed for gradient descent."
Since the algorithm comes from Breeze and I noticed Breeze actually supports L1 (OWLQN <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering if there is some special considerations that current MLLib didn't support OWLQN? And any plan to add it? Thanks for your time! On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong...@gmail.com> wrote: > Update. > > I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which > sounds could control the locality. The default value is 0.1 (smaller value > means lower locality). I change it to 1.0 (full locality) and use #3 > approach, then find a lot improvement (20%~40%). Although the Web UI still > shows the type of task as 'ANY' and the input is from shuffle read, but the > real performance is much better before change this parameter. > [image: Inline image 1] > > I think the benefit includes: > > 1. This approach keep the physical partition size small, but make each > task handle multiple partitions. So the memory requested for > deserialization is reduced, which also reduce the GC time. That is exactly > what we observed in our job. > > 2. This approach will not hit the 2G limitation, because it not change the > partition size. > > And I also think that, Spark may change this default value, or at least > expose this parameter to users (*CoalescedRDD *is a private class, and > *RDD*.*coalesce* also don't have a parameter to control that). > > On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <men...@gmail.com> wrote: > >> Sorry, I missed #2. My suggestion is the same as #2. You need to set a >> bigger numPartitions to avoid hitting integer bound or 2G limitation, >> at the cost of increased shuffle size per iteration. If you use a >> CombineInputFormat and then cache, it will try to give you roughly the >> same size per partition. There will be some remote fetches from HDFS >> but still cheaper than calling RDD.repartition(). >> >> For coalesce without shuffle, I don't know how to set the right number >> of partitions either ... >> >> -Xiangrui >> >> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong...@gmail.com> >> wrote: >> > Hi Xiangrui, >> > >> > Thanks for your reply! >> > >> > Yes, our data is very sparse, but RDD.repartition invoke >> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it >> has >> > the same effect with #2, right? >> > >> > For CombineInputFormat, although I haven't tried it, but it sounds that >> it >> > will combine multiple partitions into a large partition if I cache it, >> so >> > same issues as #1? >> > >> > For coalesce, could you share some best practice how to set the right >> number >> > of partitions to avoid locality problem? >> > >> > Thanks! >> > >> > >> > >> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> >> wrote: >> >> >> >> Assuming that your data is very sparse, I would recommend >> >> RDD.repartition. But if it is not the case and you don't want to >> >> shuffle the data, you can try a CombineInputFormat and then parse the >> >> lines into labeled points. Coalesce may cause locality problems if you >> >> didn't use the right number of partitions. -Xiangrui >> >> >> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com> >> >> wrote: >> >> > I think this has the same effect and issue with #1, right? >> >> > >> >> > >> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen < >> chenjiush...@gmail.com> >> >> > wrote: >> >> >> >> >> >> How about increase HDFS file extent size? like current value is >> 128M, >> >> >> we >> >> >> make it 512M or bigger. >> >> >> >> >> >> >> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com >> > >> >> >> wrote: >> >> >>> >> >> >>> Hi all, >> >> >>> >> >> >>> We are trying to use Spark MLlib to train super large data (100M >> >> >>> features >> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By >> default, >> >> >>> MLlib >> >> >>> will create a task for every partition at each iteration. But >> because >> >> >>> our >> >> >>> dimensions are also very high, such large number of tasks will >> >> >>> increase >> >> >>> large network overhead to transfer the weight vector. So we want to >> >> >>> reduce >> >> >>> the number of tasks, we tried below ways: >> >> >>> >> >> >>> 1. Coalesce partitions without shuffling, then cache. >> >> >>> >> >> >>> data.coalesce(numPartitions).cache() >> >> >>> >> >> >>> This works fine for relative small data, but when data is >> increasing >> >> >>> and >> >> >>> numPartitions is fixed, the size of one partition will be large. >> This >> >> >>> introduces two issues: the first is, the larger partition will need >> >> >>> larger >> >> >>> object and more memory at runtime, and trigger GC more frequently; >> the >> >> >>> second is, we meet the issue 'size exceeds integer.max_value' >> error, >> >> >>> which >> >> >>> seems be caused by the size of one partition larger than 2G >> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391). >> >> >>> >> >> >>> 2. Coalesce partitions with shuffling, then cache. >> >> >>> >> >> >>> data.coalesce(numPartitions, true).cache() >> >> >>> >> >> >>> It could mitigate the second issue in #1 at some degree, but fist >> >> >>> issue >> >> >>> is still there, and it also will introduce large amount of >> shullfling. >> >> >>> >> >> >>> 3. Cache data first, and coalesce partitions. >> >> >>> >> >> >>> data.cache().coalesce(numPartitions) >> >> >>> >> >> >>> In this way, the number of cached partitions is not change, but >> each >> >> >>> task >> >> >>> read the data from multiple partitions. However, I find the task >> will >> >> >>> loss >> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means that >> >> >>> tasks >> >> >>> read data from other nodes, and become slower than that read data >> from >> >> >>> local >> >> >>> memory. >> >> >>> >> >> >>> I think the best way should like #3, but leverage locality as more >> as >> >> >>> possible. Is there any way to do that? Any suggestions? >> >> >>> >> >> >>> Thanks! >> >> >>> >> >> >>> -- >> >> >>> ZHENG, Xu-dong >> >> >>> >> >> >> >> >> > >> >> > >> >> > >> >> > -- >> >> > 郑旭东 >> >> > ZHENG, Xu-dong >> >> > >> > >> > >> > >> > >> > -- >> > 郑旭东 >> > ZHENG, Xu-dong >> > >> > > > > -- > 郑旭东 > ZHENG, Xu-dong > >