Thanks DB. Did you mean this? spark.rdd.compress true
On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <dbt...@dbtsai.com> wrote: > For saving the memory, I recommend you compress the cached RDD, and it > will be couple times smaller than original data sets. > > > Sincerely, > > DB Tsai > ------------------------------------------------------- > My Blog: https://www.dbtsai.com > LinkedIn: https://www.linkedin.com/in/dbtsai > > > On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiush...@gmail.com> > wrote: > >> Thanks DB and Xiangrui. Glad to know you guys are actively working on it. >> >> Another thing, did we evaluate the loss of using Float to store values? >> currently it is Double. Use fewer bits has the benifit of memory footprint >> reduction. According to Google, they even uses 16 bits (a special >> encoding scheme called q2.13) >> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner >> without measurable loss, and can save 75% memory. >> >> >> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbt...@dbtsai.com> wrote: >> >>> With David's help today, we were able to implement elastic net glm in >>> Spark. It's surprising easy, and with just some modification in breeze's >>> OWLQN code, it just works without further investigation. >>> >>> We did benchmark, and the coefficients are within 0.5% differences >>> compared with R's glmnet package. I guess this is first truly distributed >>> glmnet implementation. >>> >>> Still require some effort to have it in mllib; mostly api cleanup work. >>> >>> 1) I'll submit a PR to breeze which implements weighted regularization >>> in OWLQN. >>> 2) This also depends on https://issues.apache.org/jira/browse/SPARK-2505 >>> which we have internal version requiring some cleanup for open source >>> project. >>> >>> >>> Sincerely, >>> >>> DB Tsai >>> ------------------------------------------------------- >>> My Blog: https://www.dbtsai.com >>> LinkedIn: https://www.linkedin.com/in/dbtsai >>> >>> >>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <men...@gmail.com> wrote: >>> >>>> +DB & David (They implemented QWLQN on Spark today.) >>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiush...@gmail.com> wrote: >>>> >>>>> Hi Xiangrui, >>>>> >>>>> A side-by question about MLLib. >>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only >>>>> support L2 regurization, the doc explains it: "The L1 regularization by >>>>> using L1Updater >>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater> >>>>> will not work since the soft-thresholding logic in L1Updater is designed >>>>> for gradient descent." >>>>> >>>>> Since the algorithm comes from Breeze and I noticed Breeze actually >>>>> supports L1 (OWLQN >>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering >>>>> if there is some special considerations that current MLLib didn't support >>>>> OWLQN? And any plan to add it? >>>>> >>>>> Thanks for your time! >>>>> >>>>> >>>>> >>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong...@gmail.com> >>>>> wrote: >>>>> >>>>>> Update. >>>>>> >>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, which >>>>>> sounds could control the locality. The default value is 0.1 (smaller >>>>>> value >>>>>> means lower locality). I change it to 1.0 (full locality) and use #3 >>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI >>>>>> still >>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but >>>>>> the >>>>>> real performance is much better before change this parameter. >>>>>> [image: Inline image 1] >>>>>> >>>>>> I think the benefit includes: >>>>>> >>>>>> 1. This approach keep the physical partition size small, but make >>>>>> each task handle multiple partitions. So the memory requested for >>>>>> deserialization is reduced, which also reduce the GC time. That is >>>>>> exactly >>>>>> what we observed in our job. >>>>>> >>>>>> 2. This approach will not hit the 2G limitation, because it not >>>>>> change the partition size. >>>>>> >>>>>> And I also think that, Spark may change this default value, or at >>>>>> least expose this parameter to users (*CoalescedRDD *is a private >>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control >>>>>> that). >>>>>> >>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <men...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to set >>>>>>> a >>>>>>> bigger numPartitions to avoid hitting integer bound or 2G limitation, >>>>>>> at the cost of increased shuffle size per iteration. If you use a >>>>>>> CombineInputFormat and then cache, it will try to give you roughly >>>>>>> the >>>>>>> same size per partition. There will be some remote fetches from HDFS >>>>>>> but still cheaper than calling RDD.repartition(). >>>>>>> >>>>>>> For coalesce without shuffle, I don't know how to set the right >>>>>>> number >>>>>>> of partitions either ... >>>>>>> >>>>>>> -Xiangrui >>>>>>> >>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong...@gmail.com> >>>>>>> wrote: >>>>>>> > Hi Xiangrui, >>>>>>> > >>>>>>> > Thanks for your reply! >>>>>>> > >>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke >>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I think >>>>>>> it has >>>>>>> > the same effect with #2, right? >>>>>>> > >>>>>>> > For CombineInputFormat, although I haven't tried it, but it sounds >>>>>>> that it >>>>>>> > will combine multiple partitions into a large partition if I cache >>>>>>> it, so >>>>>>> > same issues as #1? >>>>>>> > >>>>>>> > For coalesce, could you share some best practice how to set the >>>>>>> right number >>>>>>> > of partitions to avoid locality problem? >>>>>>> > >>>>>>> > Thanks! >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> Assuming that your data is very sparse, I would recommend >>>>>>> >> RDD.repartition. But if it is not the case and you don't want to >>>>>>> >> shuffle the data, you can try a CombineInputFormat and then parse >>>>>>> the >>>>>>> >> lines into labeled points. Coalesce may cause locality problems >>>>>>> if you >>>>>>> >> didn't use the right number of partitions. -Xiangrui >>>>>>> >> >>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong < >>>>>>> dong...@gmail.com> >>>>>>> >> wrote: >>>>>>> >> > I think this has the same effect and issue with #1, right? >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen < >>>>>>> chenjiush...@gmail.com> >>>>>>> >> > wrote: >>>>>>> >> >> >>>>>>> >> >> How about increase HDFS file extent size? like current value >>>>>>> is 128M, >>>>>>> >> >> we >>>>>>> >> >> make it 512M or bigger. >>>>>>> >> >> >>>>>>> >> >> >>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong < >>>>>>> dong...@gmail.com> >>>>>>> >> >> wrote: >>>>>>> >> >>> >>>>>>> >> >>> Hi all, >>>>>>> >> >>> >>>>>>> >> >>> We are trying to use Spark MLlib to train super large data >>>>>>> (100M >>>>>>> >> >>> features >>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By >>>>>>> default, >>>>>>> >> >>> MLlib >>>>>>> >> >>> will create a task for every partition at each iteration. But >>>>>>> because >>>>>>> >> >>> our >>>>>>> >> >>> dimensions are also very high, such large number of tasks will >>>>>>> >> >>> increase >>>>>>> >> >>> large network overhead to transfer the weight vector. So we >>>>>>> want to >>>>>>> >> >>> reduce >>>>>>> >> >>> the number of tasks, we tried below ways: >>>>>>> >> >>> >>>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache. >>>>>>> >> >>> >>>>>>> >> >>> data.coalesce(numPartitions).cache() >>>>>>> >> >>> >>>>>>> >> >>> This works fine for relative small data, but when data is >>>>>>> increasing >>>>>>> >> >>> and >>>>>>> >> >>> numPartitions is fixed, the size of one partition will be >>>>>>> large. This >>>>>>> >> >>> introduces two issues: the first is, the larger partition >>>>>>> will need >>>>>>> >> >>> larger >>>>>>> >> >>> object and more memory at runtime, and trigger GC more >>>>>>> frequently; the >>>>>>> >> >>> second is, we meet the issue 'size exceeds integer.max_value' >>>>>>> error, >>>>>>> >> >>> which >>>>>>> >> >>> seems be caused by the size of one partition larger than 2G >>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391). >>>>>>> >> >>> >>>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache. >>>>>>> >> >>> >>>>>>> >> >>> data.coalesce(numPartitions, true).cache() >>>>>>> >> >>> >>>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but >>>>>>> fist >>>>>>> >> >>> issue >>>>>>> >> >>> is still there, and it also will introduce large amount of >>>>>>> shullfling. >>>>>>> >> >>> >>>>>>> >> >>> 3. Cache data first, and coalesce partitions. >>>>>>> >> >>> >>>>>>> >> >>> data.cache().coalesce(numPartitions) >>>>>>> >> >>> >>>>>>> >> >>> In this way, the number of cached partitions is not change, >>>>>>> but each >>>>>>> >> >>> task >>>>>>> >> >>> read the data from multiple partitions. However, I find the >>>>>>> task will >>>>>>> >> >>> loss >>>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that means >>>>>>> that >>>>>>> >> >>> tasks >>>>>>> >> >>> read data from other nodes, and become slower than that read >>>>>>> data from >>>>>>> >> >>> local >>>>>>> >> >>> memory. >>>>>>> >> >>> >>>>>>> >> >>> I think the best way should like #3, but leverage locality as >>>>>>> more as >>>>>>> >> >>> possible. Is there any way to do that? Any suggestions? >>>>>>> >> >>> >>>>>>> >> >>> Thanks! >>>>>>> >> >>> >>>>>>> >> >>> -- >>>>>>> >> >>> ZHENG, Xu-dong >>>>>>> >> >>> >>>>>>> >> >> >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > -- >>>>>>> >> > 郑旭东 >>>>>>> >> > ZHENG, Xu-dong >>>>>>> >> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > 郑旭东 >>>>>>> > ZHENG, Xu-dong >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> 郑旭东 >>>>>> ZHENG, Xu-dong >>>>>> >>>>>> >>>>> >>> >> >