Yes. But you need to store RDD as *serialized* Java objects. See the session of storage level http://spark.apache.org/docs/latest/programming-guide.html
Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Thu, Sep 4, 2014 at 8:06 PM, Jiusheng Chen <chenjiush...@gmail.com> wrote: > Thanks DB. > Did you mean this? > > spark.rdd.compress true > > > On Thu, Sep 4, 2014 at 2:48 PM, DB Tsai <dbt...@dbtsai.com> wrote: > >> For saving the memory, I recommend you compress the cached RDD, and it >> will be couple times smaller than original data sets. >> >> >> Sincerely, >> >> DB Tsai >> ------------------------------------------------------- >> My Blog: https://www.dbtsai.com >> LinkedIn: https://www.linkedin.com/in/dbtsai >> >> >> On Wed, Sep 3, 2014 at 9:28 PM, Jiusheng Chen <chenjiush...@gmail.com> >> wrote: >> >>> Thanks DB and Xiangrui. Glad to know you guys are actively working on it. >>> >>> Another thing, did we evaluate the loss of using Float to store values? >>> currently it is Double. Use fewer bits has the benifit of memory footprint >>> reduction. According to Google, they even uses 16 bits (a special >>> encoding scheme called q2.13) >>> <http://jmlr.org/proceedings/papers/v28/golovin13.pdf> in their learner >>> without measurable loss, and can save 75% memory. >>> >>> >>> On Thu, Sep 4, 2014 at 11:02 AM, DB Tsai <dbt...@dbtsai.com> wrote: >>> >>>> With David's help today, we were able to implement elastic net glm in >>>> Spark. It's surprising easy, and with just some modification in breeze's >>>> OWLQN code, it just works without further investigation. >>>> >>>> We did benchmark, and the coefficients are within 0.5% differences >>>> compared with R's glmnet package. I guess this is first truly distributed >>>> glmnet implementation. >>>> >>>> Still require some effort to have it in mllib; mostly api cleanup work. >>>> >>>> 1) I'll submit a PR to breeze which implements weighted regularization >>>> in OWLQN. >>>> 2) This also depends on >>>> https://issues.apache.org/jira/browse/SPARK-2505 which we have >>>> internal version requiring some cleanup for open source project. >>>> >>>> >>>> Sincerely, >>>> >>>> DB Tsai >>>> ------------------------------------------------------- >>>> My Blog: https://www.dbtsai.com >>>> LinkedIn: https://www.linkedin.com/in/dbtsai >>>> >>>> >>>> On Wed, Sep 3, 2014 at 7:34 PM, Xiangrui Meng <men...@gmail.com> wrote: >>>> >>>>> +DB & David (They implemented QWLQN on Spark today.) >>>>> On Sep 3, 2014 7:18 PM, "Jiusheng Chen" <chenjiush...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Xiangrui, >>>>>> >>>>>> A side-by question about MLLib. >>>>>> It looks current LBFGS in MLLib (version 1.0.2 and even v1.1) only >>>>>> support L2 regurization, the doc explains it: "The L1 regularization by >>>>>> using L1Updater >>>>>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.optimization.L1Updater> >>>>>> will not work since the soft-thresholding logic in L1Updater is designed >>>>>> for gradient descent." >>>>>> >>>>>> Since the algorithm comes from Breeze and I noticed Breeze actually >>>>>> supports L1 (OWLQN >>>>>> <http://www.scalanlp.org/api/breeze/#breeze.optimize.OWLQN>). Wondering >>>>>> if there is some special considerations that current MLLib didn't support >>>>>> OWLQN? And any plan to add it? >>>>>> >>>>>> Thanks for your time! >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Aug 22, 2014 at 12:56 PM, ZHENG, Xu-dong <dong...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Update. >>>>>>> >>>>>>> I just find a magic parameter *blanceSlack* in *CoalescedRDD*, >>>>>>> which sounds could control the locality. The default value is 0.1 >>>>>>> (smaller >>>>>>> value means lower locality). I change it to 1.0 (full locality) and use >>>>>>> #3 >>>>>>> approach, then find a lot improvement (20%~40%). Although the Web UI >>>>>>> still >>>>>>> shows the type of task as 'ANY' and the input is from shuffle read, but >>>>>>> the >>>>>>> real performance is much better before change this parameter. >>>>>>> [image: Inline image 1] >>>>>>> >>>>>>> I think the benefit includes: >>>>>>> >>>>>>> 1. This approach keep the physical partition size small, but make >>>>>>> each task handle multiple partitions. So the memory requested for >>>>>>> deserialization is reduced, which also reduce the GC time. That is >>>>>>> exactly >>>>>>> what we observed in our job. >>>>>>> >>>>>>> 2. This approach will not hit the 2G limitation, because it not >>>>>>> change the partition size. >>>>>>> >>>>>>> And I also think that, Spark may change this default value, or at >>>>>>> least expose this parameter to users (*CoalescedRDD *is a private >>>>>>> class, and *RDD*.*coalesce* also don't have a parameter to control >>>>>>> that). >>>>>>> >>>>>>> On Wed, Aug 13, 2014 at 12:28 AM, Xiangrui Meng <men...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Sorry, I missed #2. My suggestion is the same as #2. You need to >>>>>>>> set a >>>>>>>> bigger numPartitions to avoid hitting integer bound or 2G >>>>>>>> limitation, >>>>>>>> at the cost of increased shuffle size per iteration. If you use a >>>>>>>> CombineInputFormat and then cache, it will try to give you roughly >>>>>>>> the >>>>>>>> same size per partition. There will be some remote fetches from HDFS >>>>>>>> but still cheaper than calling RDD.repartition(). >>>>>>>> >>>>>>>> For coalesce without shuffle, I don't know how to set the right >>>>>>>> number >>>>>>>> of partitions either ... >>>>>>>> >>>>>>>> -Xiangrui >>>>>>>> >>>>>>>> On Tue, Aug 12, 2014 at 6:16 AM, ZHENG, Xu-dong <dong...@gmail.com> >>>>>>>> wrote: >>>>>>>> > Hi Xiangrui, >>>>>>>> > >>>>>>>> > Thanks for your reply! >>>>>>>> > >>>>>>>> > Yes, our data is very sparse, but RDD.repartition invoke >>>>>>>> > RDD.coalesce(numPartitions, shuffle = true) internally, so I >>>>>>>> think it has >>>>>>>> > the same effect with #2, right? >>>>>>>> > >>>>>>>> > For CombineInputFormat, although I haven't tried it, but it >>>>>>>> sounds that it >>>>>>>> > will combine multiple partitions into a large partition if I >>>>>>>> cache it, so >>>>>>>> > same issues as #1? >>>>>>>> > >>>>>>>> > For coalesce, could you share some best practice how to set the >>>>>>>> right number >>>>>>>> > of partitions to avoid locality problem? >>>>>>>> > >>>>>>>> > Thanks! >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > On Tue, Aug 12, 2014 at 3:51 PM, Xiangrui Meng <men...@gmail.com> >>>>>>>> wrote: >>>>>>>> >> >>>>>>>> >> Assuming that your data is very sparse, I would recommend >>>>>>>> >> RDD.repartition. But if it is not the case and you don't want to >>>>>>>> >> shuffle the data, you can try a CombineInputFormat and then >>>>>>>> parse the >>>>>>>> >> lines into labeled points. Coalesce may cause locality problems >>>>>>>> if you >>>>>>>> >> didn't use the right number of partitions. -Xiangrui >>>>>>>> >> >>>>>>>> >> On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong < >>>>>>>> dong...@gmail.com> >>>>>>>> >> wrote: >>>>>>>> >> > I think this has the same effect and issue with #1, right? >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen < >>>>>>>> chenjiush...@gmail.com> >>>>>>>> >> > wrote: >>>>>>>> >> >> >>>>>>>> >> >> How about increase HDFS file extent size? like current value >>>>>>>> is 128M, >>>>>>>> >> >> we >>>>>>>> >> >> make it 512M or bigger. >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong < >>>>>>>> dong...@gmail.com> >>>>>>>> >> >> wrote: >>>>>>>> >> >>> >>>>>>>> >> >>> Hi all, >>>>>>>> >> >>> >>>>>>>> >> >>> We are trying to use Spark MLlib to train super large data >>>>>>>> (100M >>>>>>>> >> >>> features >>>>>>>> >> >>> and 5B rows). The input data in HDFS has ~26K partitions. By >>>>>>>> default, >>>>>>>> >> >>> MLlib >>>>>>>> >> >>> will create a task for every partition at each iteration. >>>>>>>> But because >>>>>>>> >> >>> our >>>>>>>> >> >>> dimensions are also very high, such large number of tasks >>>>>>>> will >>>>>>>> >> >>> increase >>>>>>>> >> >>> large network overhead to transfer the weight vector. So we >>>>>>>> want to >>>>>>>> >> >>> reduce >>>>>>>> >> >>> the number of tasks, we tried below ways: >>>>>>>> >> >>> >>>>>>>> >> >>> 1. Coalesce partitions without shuffling, then cache. >>>>>>>> >> >>> >>>>>>>> >> >>> data.coalesce(numPartitions).cache() >>>>>>>> >> >>> >>>>>>>> >> >>> This works fine for relative small data, but when data is >>>>>>>> increasing >>>>>>>> >> >>> and >>>>>>>> >> >>> numPartitions is fixed, the size of one partition will be >>>>>>>> large. This >>>>>>>> >> >>> introduces two issues: the first is, the larger partition >>>>>>>> will need >>>>>>>> >> >>> larger >>>>>>>> >> >>> object and more memory at runtime, and trigger GC more >>>>>>>> frequently; the >>>>>>>> >> >>> second is, we meet the issue 'size exceeds >>>>>>>> integer.max_value' error, >>>>>>>> >> >>> which >>>>>>>> >> >>> seems be caused by the size of one partition larger than 2G >>>>>>>> >> >>> (https://issues.apache.org/jira/browse/SPARK-1391). >>>>>>>> >> >>> >>>>>>>> >> >>> 2. Coalesce partitions with shuffling, then cache. >>>>>>>> >> >>> >>>>>>>> >> >>> data.coalesce(numPartitions, true).cache() >>>>>>>> >> >>> >>>>>>>> >> >>> It could mitigate the second issue in #1 at some degree, but >>>>>>>> fist >>>>>>>> >> >>> issue >>>>>>>> >> >>> is still there, and it also will introduce large amount of >>>>>>>> shullfling. >>>>>>>> >> >>> >>>>>>>> >> >>> 3. Cache data first, and coalesce partitions. >>>>>>>> >> >>> >>>>>>>> >> >>> data.cache().coalesce(numPartitions) >>>>>>>> >> >>> >>>>>>>> >> >>> In this way, the number of cached partitions is not change, >>>>>>>> but each >>>>>>>> >> >>> task >>>>>>>> >> >>> read the data from multiple partitions. However, I find the >>>>>>>> task will >>>>>>>> >> >>> loss >>>>>>>> >> >>> locality by this way. I find a lot of 'ANY' tasks, that >>>>>>>> means that >>>>>>>> >> >>> tasks >>>>>>>> >> >>> read data from other nodes, and become slower than that read >>>>>>>> data from >>>>>>>> >> >>> local >>>>>>>> >> >>> memory. >>>>>>>> >> >>> >>>>>>>> >> >>> I think the best way should like #3, but leverage locality >>>>>>>> as more as >>>>>>>> >> >>> possible. Is there any way to do that? Any suggestions? >>>>>>>> >> >>> >>>>>>>> >> >>> Thanks! >>>>>>>> >> >>> >>>>>>>> >> >>> -- >>>>>>>> >> >>> ZHENG, Xu-dong >>>>>>>> >> >>> >>>>>>>> >> >> >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > >>>>>>>> >> > -- >>>>>>>> >> > 郑旭东 >>>>>>>> >> > ZHENG, Xu-dong >>>>>>>> >> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > 郑旭东 >>>>>>>> > ZHENG, Xu-dong >>>>>>>> > >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> 郑旭东 >>>>>>> ZHENG, Xu-dong >>>>>>> >>>>>>> >>>>>> >>>> >>> >> >