For saving the memory, I recommend you compress the cached RDD, and it will be couple times smaller than original data sets.
Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiush...@gmail.com> wrote: > Thanks DB and Xiangrui. Glad to know you guys are actively working on it. > > Another thing, did we evaluate the loss of using Float to store values? > currently it is Double. Use fewer bits has the benifit of memory footprint > reduction. According to Google, they even uses 16 bits (a special > encoding scheme called q2.13) > <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner > without measurable loss, and can save 75% memory. > > > On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbt...@dbtsai.com> wrote: > >> With David's help today, we were able to implement elastic net glm in >> Spark. It's surprising easy, and with just some modification in breeze's >> OWLQN code, it just works without further investigation. >> >> We did benchmark, and the coefficients are within 0.5% differences >> compared with R's glmnet package. I guess this is first truly distributed >> glmnet implementation. >> >> Still require some effort to have it in mllib; mostly api cleanup work. >> >> 1) I'll submit a PR to breeze which implements weighted regularization in >> OWLQN. >> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505 >> which we have internal version requiring some cleanup for open source >> project. >> >> >> Sincerely, >> >> DB Tsai >> ------------------------------------------------------- >> My Blog: https://www.dbtsai.com >> LinkedIn: https://www.linkedin.com/in/dbtsai >> >> >> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <men...@gmail.com> wrote: >> >>> +DB & David (They implemented QWLQN on Spark today.) >>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiush...@gmail.com> wrote: >>> >>>> Hi Xiangrui, >>>> >>>> A side-by question about MLLib. >>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only >>>> support L2 regurization, the doc explains it: "The L1 regularization by >>>> using L1Updater >>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater> >>>> will not work since the soft-thresholding logic in L1Updater is designed >>>> for gradient descent." >>>> >>>> Since the algorithm comes from Breeze and I noticed Breeze actually >>>> supports L1 (OWLQN >>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering >>>> if there is some special considerations that current MLLib didn't support >>>> OWLQN? And any plan to add it? >>>> >>>> Thanks for your time! >>>> >>>> >>>> >>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong...@gmail.com> >>>> wrote: >>>> >>>>> Update. >>>>> >>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which >>>>> sounds could control the locality. The default value is 0.1 (smaller value >>>>> means lower locality). I change it to 1.0 (full locality) and use #3 >>>>> approach, then find a lot improvement (20%~40%). Although the Web UI still >>>>> shows the type of task as 'ANY' and the input is from shuffle read, but >>>>> the >>>>> real performance is much better before change this parameter. >>>>> [image: Inline image 1] >>>>> >>>>> I think the benefit includes: >>>>> >>>>> 1. This approach keep the physical partition size small, but make each >>>>> task handle multiple partitions. So the memory requested for >>>>> deserialization is reduced, which also reduce the GC time. That is exactly >>>>> what we observed in our job. >>>>> >>>>> 2. This approach will not hit the 2G limitation, because it not change >>>>> the partition size. >>>>> >>>>> And I also think that, Spark may change this default value, or at >>>>> least expose this parameter to users (*CoalescedRDD *is a private >>>>> class, and *RDD*.*coalesce* also don't have a parameter to control >>>>> that). >>>>> >>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <men...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set a >>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation, >>>>>> at the cost of increased shuffle size per iteration. If you use a >>>>>> CombineInputFormat and then cache, it will try to give you roughly the >>>>>> same size per partition. There will be some remote fetches from HDFS >>>>>> but still cheaper than calling RDD.repartition(). >>>>>> >>>>>> For coalesce without shuffle, I don't know how to set the right number >>>>>> of partitions either ... >>>>>> >>>>>> -Xiangrui >>>>>> >>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong...@gmail.com> >>>>>> wrote: >>>>>> > Hi Xiangrui, >>>>>> > >>>>>> > Thanks for your reply! >>>>>> > >>>>>> > Yes, our data is very sparse, but RDD.repartition invoke >>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think >>>>>> it has >>>>>> > the same effect with #2, right? >>>>>> > >>>>>> > For CombineInputFormat, although I haven't tried it, but it sounds >>>>>> that it >>>>>> > will combine multiple partitions into a large partition if I cache >>>>>> it, so >>>>>> > same issues as #1? >>>>>> > >>>>>> > For coalesce, could you share some best practice how to set the >>>>>> right number >>>>>> > of partitions to avoid locality problem? >>>>>> > >>>>>> > Thanks! >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> >>>>>> wrote: >>>>>> >> >>>>>> >> Assuming that your data is very sparse, I would recommend >>>>>> >> RDD.repartition. But if it is not the case and you don't want to >>>>>> >> shuffle the data, you can try a CombineInputFormat and then parse >>>>>> the >>>>>> >> lines into labeled points. Coalesce may cause locality problems if >>>>>> you >>>>>> >> didn't use the right number of partitions. -Xiangrui >>>>>> >> >>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong < >>>>>> dong...@gmail.com> >>>>>> >> wrote: >>>>>> >> > I think this has the same effect and issue with #1, right? >>>>>> >> > >>>>>> >> > >>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen < >>>>>> chenjiush...@gmail.com> >>>>>> >> > wrote: >>>>>> >> >> >>>>>> >> >> How about increase HDFS file extent size? like current value is >>>>>> 128M, >>>>>> >> >> we >>>>>> >> >> make it 512M or bigger. >>>>>> >> >> >>>>>> >> >> >>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong < >>>>>> dong...@gmail.com> >>>>>> >> >> wrote: >>>>>> >> >>> >>>>>> >> >>> Hi all, >>>>>> >> >>> >>>>>> >> >>> We are trying to use Spark MLlib to train super large data >>>>>> (100M >>>>>> >> >>> features >>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By >>>>>> default, >>>>>> >> >>> MLlib >>>>>> >> >>> will create a task for every partition at each iteration. But >>>>>> because >>>>>> >> >>> our >>>>>> >> >>> dimensions are also very high, such large number of tasks will >>>>>> >> >>> increase >>>>>> >> >>> large network overhead to transfer the weight vector. So we >>>>>> want to >>>>>> >> >>> reduce >>>>>> >> >>> the number of tasks, we tried below ways: >>>>>> >> >>> >>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache. >>>>>> >> >>> >>>>>> >> >>> data.coalesce(numPartitions).cache() >>>>>> >> >>> >>>>>> >> >>> This works fine for relative small data, but when data is >>>>>> increasing >>>>>> >> >>> and >>>>>> >> >>> numPartitions is fixed, the size of one partition will be >>>>>> large. This >>>>>> >> >>> introduces two issues: the first is, the larger partition will >>>>>> need >>>>>> >> >>> larger >>>>>> >> >>> object and more memory at runtime, and trigger GC more >>>>>> frequently; the >>>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value' >>>>>> error, >>>>>> >> >>> which >>>>>> >> >>> seems be caused by the size of one partition larger than 2G >>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391). >>>>>> >> >>> >>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache. >>>>>> >> >>> >>>>>> >> >>> data.coalesce(numPartitions, true).cache() >>>>>> >> >>> >>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but >>>>>> fist >>>>>> >> >>> issue >>>>>> >> >>> is still there, and it also will introduce large amount of >>>>>> shullfling. >>>>>> >> >>> >>>>>> >> >>> 3. Cache data first, and coalesce partitions. >>>>>> >> >>> >>>>>> >> >>> data.cache().coalesce(numPartitions) >>>>>> >> >>> >>>>>> >> >>> In this way, the number of cached partitions is not change, >>>>>> but each >>>>>> >> >>> task >>>>>> >> >>> read the data from multiple partitions. However, I find the >>>>>> task will >>>>>> >> >>> loss >>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means >>>>>> that >>>>>> >> >>> tasks >>>>>> >> >>> read data from other nodes, and become slower than that read >>>>>> data from >>>>>> >> >>> local >>>>>> >> >>> memory. >>>>>> >> >>> >>>>>> >> >>> I think the best way should like #3, but leverage locality as >>>>>> more as >>>>>> >> >>> possible. Is there any way to do that? Any suggestions? >>>>>> >> >>> >>>>>> >> >>> Thanks! >>>>>> >> >>> >>>>>> >> >>> -- >>>>>> >> >>> ZHENG, Xu-dong >>>>>> >> >>> >>>>>> >> >> >>>>>> >> > >>>>>> >> > >>>>>> >> > >>>>>> >> > -- >>>>>> >> > 郑旭东 >>>>>> >> > ZHENG, Xu-dong >>>>>> >> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > 郑旭东 >>>>>> > ZHENG, Xu-dong >>>>>> > >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> 郑旭东 >>>>> ZHENG, Xu-dong >>>>> >>>>> >>>> >> >