+DB & David (They implemented QWLQN on Spark today.) On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiush...@gmail.com> wrote:
> Hi Xiangrui, > > A side-by question about MLLib. > It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only support > L2 regurization, the doc explains it: "The L1 regularization by using > L1Updater > <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater> > will not work since the soft-thresholding logic in L1Updater is designed > for gradient descent." > > Since the algorithm comes from Breeze and I noticed Breeze actually > supports L1 (OWLQN > <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering > if there is some special considerations that current MLLib didn't support > OWLQN? And any plan to add it? > > Thanks for your time! > > > > On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong...@gmail.com> > wrote: > >> Update. >> >> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which >> sounds could control the locality. The default value is 0.1 (smaller value >> means lower locality). I change it to 1.0 (full locality) and use #3 >> approach, then find a lot improvement (20%~40%). Although the Web UI still >> shows the type of task as 'ANY' and the input is from shuffle read, but the >> real performance is much better before change this parameter. >> [image: Inline image 1] >> >> I think the benefit includes: >> >> 1. This approach keep the physical partition size small, but make each >> task handle multiple partitions. So the memory requested for >> deserialization is reduced, which also reduce the GC time. That is exactly >> what we observed in our job. >> >> 2. This approach will not hit the 2G limitation, because it not change >> the partition size. >> >> And I also think that, Spark may change this default value, or at least >> expose this parameter to users (*CoalescedRDD *is a private class, and >> *RDD*.*coalesce* also don't have a parameter to control that). >> >> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <men...@gmail.com> wrote: >> >>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a >>> bigger numPartitions to avoid hitting integer bound or 2G limitation, >>> at the cost of increased shuffle size per iteration. If you use a >>> CombineInputFormat and then cache, it will try to give you roughly the >>> same size per partition. There will be some remote fetches from HDFS >>> but still cheaper than calling RDD.repartition(). >>> >>> For coalesce without shuffle, I don't know how to set the right number >>> of partitions either ... >>> >>> -Xiangrui >>> >>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong...@gmail.com> >>> wrote: >>> > Hi Xiangrui, >>> > >>> > Thanks for your reply! >>> > >>> > Yes, our data is very sparse, but RDD.repartition invoke >>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think it >>> has >>> > the same effect with #2, right? >>> > >>> > For CombineInputFormat, although I haven't tried it, but it sounds >>> that it >>> > will combine multiple partitions into a large partition if I cache it, >>> so >>> > same issues as #1? >>> > >>> > For coalesce, could you share some best practice how to set the right >>> number >>> > of partitions to avoid locality problem? >>> > >>> > Thanks! >>> > >>> > >>> > >>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> >>> wrote: >>> >> >>> >> Assuming that your data is very sparse, I would recommend >>> >> RDD.repartition. But if it is not the case and you don't want to >>> >> shuffle the data, you can try a CombineInputFormat and then parse the >>> >> lines into labeled points. Coalesce may cause locality problems if you >>> >> didn't use the right number of partitions. -Xiangrui >>> >> >>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com> >>> >> wrote: >>> >> > I think this has the same effect and issue with #1, right? >>> >> > >>> >> > >>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen < >>> chenjiush...@gmail.com> >>> >> > wrote: >>> >> >> >>> >> >> How about increase HDFS file extent size? like current value is >>> 128M, >>> >> >> we >>> >> >> make it 512M or bigger. >>> >> >> >>> >> >> >>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong < >>> dong...@gmail.com> >>> >> >> wrote: >>> >> >>> >>> >> >>> Hi all, >>> >> >>> >>> >> >>> We are trying to use Spark MLlib to train super large data (100M >>> >> >>> features >>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By >>> default, >>> >> >>> MLlib >>> >> >>> will create a task for every partition at each iteration. But >>> because >>> >> >>> our >>> >> >>> dimensions are also very high, such large number of tasks will >>> >> >>> increase >>> >> >>> large network overhead to transfer the weight vector. So we want >>> to >>> >> >>> reduce >>> >> >>> the number of tasks, we tried below ways: >>> >> >>> >>> >> >>> 1. Coalesce partitions without shuffling, then cache. >>> >> >>> >>> >> >>> data.coalesce(numPartitions).cache() >>> >> >>> >>> >> >>> This works fine for relative small data, but when data is >>> increasing >>> >> >>> and >>> >> >>> numPartitions is fixed, the size of one partition will be large. >>> This >>> >> >>> introduces two issues: the first is, the larger partition will >>> need >>> >> >>> larger >>> >> >>> object and more memory at runtime, and trigger GC more >>> frequently; the >>> >> >>> second is, we meet the issue 'size exceeds integer.max_value' >>> error, >>> >> >>> which >>> >> >>> seems be caused by the size of one partition larger than 2G >>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391). >>> >> >>> >>> >> >>> 2. Coalesce partitions with shuffling, then cache. >>> >> >>> >>> >> >>> data.coalesce(numPartitions, true).cache() >>> >> >>> >>> >> >>> It could mitigate the second issue in #1 at some degree, but fist >>> >> >>> issue >>> >> >>> is still there, and it also will introduce large amount of >>> shullfling. >>> >> >>> >>> >> >>> 3. Cache data first, and coalesce partitions. >>> >> >>> >>> >> >>> data.cache().coalesce(numPartitions) >>> >> >>> >>> >> >>> In this way, the number of cached partitions is not change, but >>> each >>> >> >>> task >>> >> >>> read the data from multiple partitions. However, I find the task >>> will >>> >> >>> loss >>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means that >>> >> >>> tasks >>> >> >>> read data from other nodes, and become slower than that read data >>> from >>> >> >>> local >>> >> >>> memory. >>> >> >>> >>> >> >>> I think the best way should like #3, but leverage locality as >>> more as >>> >> >>> possible. Is there any way to do that? Any suggestions? >>> >> >>> >>> >> >>> Thanks! >>> >> >>> >>> >> >>> -- >>> >> >>> ZHENG, Xu-dong >>> >> >>> >>> >> >> >>> >> > >>> >> > >>> >> > >>> >> > -- >>> >> > 郑旭东 >>> >> > ZHENG, Xu-dong >>> >> > >>> > >>> > >>> > >>> > >>> > -- >>> > 郑旭东 >>> > ZHENG, Xu-dong >>> > >>> >> >> >> >> -- >> 郑旭东 >> ZHENG, Xu-dong >> >> >