Re: Sqoop on Spark

2016-04-05 Thread ayan guha
Thanks guys for feedback.

On Wed, Apr 6, 2016 at 3:44 PM, Jörn Franke  wrote:

> I do not think you can be more resource efficient. In the end you have to
> store the data anyway on HDFS . You have a lot of development effort for
> doing something like sqoop. Especially with error handling.
> You may create a ticket with the Sqoop guys to support Spark as an
> execution engine and maybe it is less effort to plug it in there.
> Maybe if your cluster is loaded then you may want to add more machines or
> improve the existing programs.
>
> On 06 Apr 2016, at 07:33, ayan guha  wrote:
>
> One of the reason in my mind is to avoid Map-Reduce application completely
> during ingestion, if possible. Also, I can then use Spark stand alone
> cluster to ingest, even if my hadoop cluster is heavily loaded. What you
> guys think?
>
> On Wed, Apr 6, 2016 at 3:13 PM, Jörn Franke  wrote:
>
>> Why do you want to reimplement something which is already there?
>>
>> On 06 Apr 2016, at 06:47, ayan guha  wrote:
>>
>> Hi
>>
>> Thanks for reply. My use case is query ~40 tables from Oracle (using
>> index and incremental only) and add data to existing Hive tables. Also, it
>> would be good to have an option to create Hive table, driven by job
>> specific configuration.
>>
>> What do you think?
>>
>> Best
>> Ayan
>>
>> On Wed, Apr 6, 2016 at 2:30 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> It depends on your use case using sqoop.
>>> What's it like?
>>>
>>> // maropu
>>>
>>> On Wed, Apr 6, 2016 at 1:26 PM, ayan guha  wrote:
>>>
 Hi All

 Asking opinion: is it possible/advisable to use spark to replace what
 sqoop does? Any existing project done in similar lines?

 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


-- 
Best Regards,
Ayan Guha


Sqoop on Spark

2016-04-05 Thread ayan guha
Hi All

Asking opinion: is it possible/advisable to use spark to replace what sqoop
does? Any existing project done in similar lines?

-- 
Best Regards,
Ayan Guha


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Nick Pentreath
+1 for this proposal - as you mention I think it's the defacto current
situation anyway.

Note that from a developer view it's just the user-facing API that will be
only "ml" - the majority of the actual algorithms still operate on RDDs
under the good currently.
On Wed, 6 Apr 2016 at 05:03, Chris Fregly  wrote:

> perhaps renaming to Spark ML would actually clear up code and
> documentation confusion?
>
> +1 for rename
>
> On Apr 5, 2016, at 7:00 PM, Reynold Xin  wrote:
>
> +1
>
> This is a no brainer IMO.
>
>
> On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley 
> wrote:
>
>> +1  By the way, the JIRA for tracking (Scala) API parity is:
>> https://issues.apache.org/jira/browse/SPARK-4591
>>
>> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
>> wrote:
>>
>>> This sounds good to me as well. The one thing we should pay attention to
>>> is how we update the docs so that people know to start with the spark.ml
>>> classes. Right now the docs list spark.mllib first and also seem more
>>> comprehensive in that area than in spark.ml, so maybe people naturally
>>> move towards that.
>>>
>>> Matei
>>>
>>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>>>
>>> Yes, DB (cc'ed) is working on porting the local linear algebra library
>>> over (SPARK-13944). There are also frequent pattern mining algorithms we
>>> need to port over in order to reach feature parity. -Xiangrui
>>>
>>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
>>> shiva...@eecs.berkeley.edu> wrote:
>>>
 Overall this sounds good to me. One question I have is that in
 addition to the ML algorithms we have a number of linear algebra
 (various distributed matrices) and statistical methods in the
 spark.mllib package. Is the plan to port or move these to the spark.ml
 namespace in the 2.x series ?

 Thanks
 Shivaram

 On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
 > FWIW, all of that sounds like a good plan to me. Developing one API is
 > certainly better than two.
 >
 > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng 
 wrote:
 >> Hi all,
 >>
 >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
 built
 >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
 API has
 >> been developed under the spark.ml package, while the old RDD-based
 API has
 >> been developed in parallel under the spark.mllib package. While it
 was
 >> easier to implement and experiment with new APIs under a new
 package, it
 >> became harder and harder to maintain as both packages grew bigger and
 >> bigger. And new users are often confused by having two sets of APIs
 with
 >> overlapped functions.
 >>
 >> We started to recommend the DataFrame-based API over the RDD-based
 API in
 >> Spark 1.5 for its versatility and flexibility, and we saw the
 development
 >> and the usage gradually shifting to the DataFrame-based API. Just
 counting
 >> the lines of Scala code, from 1.5 to the current master we added
 ~1
 >> lines to the DataFrame-based API while ~700 to the RDD-based API.
 So, to
 >> gather more resources on the development of the DataFrame-based API
 and to
 >> help users migrate over sooner, I want to propose switching
 RDD-based MLlib
 >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
 >>
 >> * We do not accept new features in the RDD-based spark.mllib
 package, unless
 >> they block implementing new features in the DataFrame-based spark.ml
 >> package.
 >> * We still accept bug fixes in the RDD-based API.
 >> * We will add more features to the DataFrame-based API in the 2.x
 series to
 >> reach feature parity with the RDD-based API.
 >> * Once we reach feature parity (possibly in Spark 2.2), we will
 deprecate
 >> the RDD-based API.
 >> * We will remove the RDD-based API from the main Spark repo in Spark
 3.0.
 >>
 >> Though the RDD-based API is already in de facto maintenance mode,
 this
 >> announcement will make it clear and hence important to both MLlib
 developers
 >> and users. So we’d greatly appreciate your feedback!
 >>
 >> (As a side note, people sometimes use “Spark ML” to refer to the
 >> DataFrame-based API or even the entire MLlib component. This also
 causes
 >> confusion. To be clear, “Spark ML” is not an official name and there
 are no
 >> plans to rename MLlib to “Spark ML” at this time.)
 >>
 >> Best,
 >> Xiangrui
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >

>>>
>>>

Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Chris Fregly
perhaps renaming to Spark ML would actually clear up code and documentation 
confusion?

+1 for rename 

> On Apr 5, 2016, at 7:00 PM, Reynold Xin  wrote:
> 
> +1
> 
> This is a no brainer IMO.
> 
> 
>> On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley  wrote:
>> +1  By the way, the JIRA for tracking (Scala) API parity is: 
>> https://issues.apache.org/jira/browse/SPARK-4591
>> 
>>> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia  
>>> wrote:
>>> This sounds good to me as well. The one thing we should pay attention to is 
>>> how we update the docs so that people know to start with the spark.ml 
>>> classes. Right now the docs list spark.mllib first and also seem more 
>>> comprehensive in that area than in spark.ml, so maybe people naturally move 
>>> towards that.
>>> 
>>> Matei
>>> 
 On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
 
 Yes, DB (cc'ed) is working on porting the local linear algebra library 
 over (SPARK-13944). There are also frequent pattern mining algorithms we 
 need to port over in order to reach feature parity. -Xiangrui
 
> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman 
>  wrote:
> Overall this sounds good to me. One question I have is that in
> addition to the ML algorithms we have a number of linear algebra
> (various distributed matrices) and statistical methods in the
> spark.mllib package. Is the plan to port or move these to the spark.ml
> namespace in the 2.x series ?
> 
> Thanks
> Shivaram
> 
> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
> > FWIW, all of that sounds like a good plan to me. Developing one API is
> > certainly better than two.
> >
> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API 
> >> built
> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based 
> >> API has
> >> been developed under the spark.ml package, while the old RDD-based API 
> >> has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package, 
> >> it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs 
> >> with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API 
> >> in
> >> Spark 1.5 for its versatility and flexibility, and we saw the 
> >> development
> >> and the usage gradually shifting to the DataFrame-based API. Just 
> >> counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, 
> >> to
> >> gather more resources on the development of the DataFrame-based API 
> >> and to
> >> help users migrate over sooner, I want to propose switching RDD-based 
> >> MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package, 
> >> unless
> >> they block implementing new features in the DataFrame-based spark.ml
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x 
> >> series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will 
> >> deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark 
> >> 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib 
> >> developers
> >> and users. So we’d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use “Spark ML” to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also 
> >> causes
> >> confusion. To be clear, “Spark ML” is not an official name and there 
> >> are no
> >> plans to rename MLlib to “Spark ML” at this time.)
> >>
> >> Best,
> >> Xiangrui
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> 


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Reynold Xin
+1

This is a no brainer IMO.


On Tue, Apr 5, 2016 at 7:32 PM, Joseph Bradley 
wrote:

> +1  By the way, the JIRA for tracking (Scala) API parity is:
> https://issues.apache.org/jira/browse/SPARK-4591
>
> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
> wrote:
>
>> This sounds good to me as well. The one thing we should pay attention to
>> is how we update the docs so that people know to start with the spark.ml
>> classes. Right now the docs list spark.mllib first and also seem more
>> comprehensive in that area than in spark.ml, so maybe people naturally
>> move towards that.
>>
>> Matei
>>
>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>>
>> Yes, DB (cc'ed) is working on porting the local linear algebra library
>> over (SPARK-13944). There are also frequent pattern mining algorithms we
>> need to port over in order to reach feature parity. -Xiangrui
>>
>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> Overall this sounds good to me. One question I have is that in
>>> addition to the ML algorithms we have a number of linear algebra
>>> (various distributed matrices) and statistical methods in the
>>> spark.mllib package. Is the plan to port or move these to the spark.ml
>>> namespace in the 2.x series ?
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
>>> > FWIW, all of that sounds like a good plan to me. Developing one API is
>>> > certainly better than two.
>>> >
>>> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng 
>>> wrote:
>>> >> Hi all,
>>> >>
>>> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
>>> built
>>> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
>>> API has
>>> >> been developed under the spark.ml package, while the old RDD-based
>>> API has
>>> >> been developed in parallel under the spark.mllib package. While it was
>>> >> easier to implement and experiment with new APIs under a new package,
>>> it
>>> >> became harder and harder to maintain as both packages grew bigger and
>>> >> bigger. And new users are often confused by having two sets of APIs
>>> with
>>> >> overlapped functions.
>>> >>
>>> >> We started to recommend the DataFrame-based API over the RDD-based
>>> API in
>>> >> Spark 1.5 for its versatility and flexibility, and we saw the
>>> development
>>> >> and the usage gradually shifting to the DataFrame-based API. Just
>>> counting
>>> >> the lines of Scala code, from 1.5 to the current master we added
>>> ~1
>>> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So,
>>> to
>>> >> gather more resources on the development of the DataFrame-based API
>>> and to
>>> >> help users migrate over sooner, I want to propose switching RDD-based
>>> MLlib
>>> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>>> >>
>>> >> * We do not accept new features in the RDD-based spark.mllib package,
>>> unless
>>> >> they block implementing new features in the DataFrame-based spark.ml
>>> >> package.
>>> >> * We still accept bug fixes in the RDD-based API.
>>> >> * We will add more features to the DataFrame-based API in the 2.x
>>> series to
>>> >> reach feature parity with the RDD-based API.
>>> >> * Once we reach feature parity (possibly in Spark 2.2), we will
>>> deprecate
>>> >> the RDD-based API.
>>> >> * We will remove the RDD-based API from the main Spark repo in Spark
>>> 3.0.
>>> >>
>>> >> Though the RDD-based API is already in de facto maintenance mode, this
>>> >> announcement will make it clear and hence important to both MLlib
>>> developers
>>> >> and users. So we’d greatly appreciate your feedback!
>>> >>
>>> >> (As a side note, people sometimes use “Spark ML” to refer to the
>>> >> DataFrame-based API or even the entire MLlib component. This also
>>> causes
>>> >> confusion. To be clear, “Spark ML” is not an official name and there
>>> are no
>>> >> plans to rename MLlib to “Spark ML” at this time.)
>>> >>
>>> >> Best,
>>> >> Xiangrui
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>


Re: lost executor due to large shuffle spill memory

2016-04-05 Thread Michael Slavitch
Do you have enough disk space for the spill?  It seems it has lots of memory 
reserved but not enough for the spill. You will need a disk that can handle the 
entire data partition for each host. Compression of the spilled data saves 
about 50% in most if not all cases.

Given the large data set I would consider a 1TB SATA flash drive, formatted as 
EXT4 or XFS  and give it exclusive access as spark.local.dir.  It will slow 
things down but it won’t stop.  There are alternatives if you want to discuss 
offline.


> On Apr 5, 2016, at 6:37 PM, l  wrote:
> 
> I have a task to remap the index to actual uuid in ALS prediction results.
> But it consistently fail due to lost executors. I noticed there's large
> shuffle spill memory but I don't know how to improve it. 
> 
>  
> 
> I've tried to reduce the number of executors while assigning each to have
> bigger memory. 
>  
> 
> But it still doesn't seem big enough. I don't know what to do. 
> 
> Below is my code:
> user = load_user()
> product = load_product()
> user.cache()
> product.cache()
> model = load_model(model_path)
> all_pairs = user.map(lambda x: x[1]).cartesian(product.map(lambda x: x[1]))
> all_prediction = model.predictAll(all_pairs)
> user_reverse = user.map(lambda r: (r[1], r[0]))
> product_reverse = product.map(lambda r: (r[1], r[0]))
> user_reversed = all_prediction.map(lambda u: (u[0], (u[1],
> u[2]))).join(user_reverse).map(lambda r: (r[1][0][0], (r[1][1],
> r[1][0][1])))
> both_reversed = user_reversed.join(product_reverse).map(lambda r:
> (r[1][0][0], r[1][1], r[1][0][1]))
> both_reversed.map(lambda x: '{}|{}|{}'.format(x[0], x[1],
> x[2])).saveAsTextFile(recommendation_path)
> 
> Both user and products are (uuid, index) tuples. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/lost-executor-due-to-large-shuffle-spill-memory-tp26683.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Holden Karau
I'm very much in favor of this, the less porting work there is the better :)

On Tue, Apr 5, 2016 at 5:32 PM, Joseph Bradley 
wrote:

> +1  By the way, the JIRA for tracking (Scala) API parity is:
> https://issues.apache.org/jira/browse/SPARK-4591
>
> On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
> wrote:
>
>> This sounds good to me as well. The one thing we should pay attention to
>> is how we update the docs so that people know to start with the spark.ml
>> classes. Right now the docs list spark.mllib first and also seem more
>> comprehensive in that area than in spark.ml, so maybe people naturally
>> move towards that.
>>
>> Matei
>>
>> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>>
>> Yes, DB (cc'ed) is working on porting the local linear algebra library
>> over (SPARK-13944). There are also frequent pattern mining algorithms we
>> need to port over in order to reach feature parity. -Xiangrui
>>
>> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
>> shiva...@eecs.berkeley.edu> wrote:
>>
>>> Overall this sounds good to me. One question I have is that in
>>> addition to the ML algorithms we have a number of linear algebra
>>> (various distributed matrices) and statistical methods in the
>>> spark.mllib package. Is the plan to port or move these to the spark.ml
>>> namespace in the 2.x series ?
>>>
>>> Thanks
>>> Shivaram
>>>
>>> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
>>> > FWIW, all of that sounds like a good plan to me. Developing one API is
>>> > certainly better than two.
>>> >
>>> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng 
>>> wrote:
>>> >> Hi all,
>>> >>
>>> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
>>> built
>>> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
>>> API has
>>> >> been developed under the spark.ml package, while the old RDD-based
>>> API has
>>> >> been developed in parallel under the spark.mllib package. While it was
>>> >> easier to implement and experiment with new APIs under a new package,
>>> it
>>> >> became harder and harder to maintain as both packages grew bigger and
>>> >> bigger. And new users are often confused by having two sets of APIs
>>> with
>>> >> overlapped functions.
>>> >>
>>> >> We started to recommend the DataFrame-based API over the RDD-based
>>> API in
>>> >> Spark 1.5 for its versatility and flexibility, and we saw the
>>> development
>>> >> and the usage gradually shifting to the DataFrame-based API. Just
>>> counting
>>> >> the lines of Scala code, from 1.5 to the current master we added
>>> ~1
>>> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So,
>>> to
>>> >> gather more resources on the development of the DataFrame-based API
>>> and to
>>> >> help users migrate over sooner, I want to propose switching RDD-based
>>> MLlib
>>> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>>> >>
>>> >> * We do not accept new features in the RDD-based spark.mllib package,
>>> unless
>>> >> they block implementing new features in the DataFrame-based spark.ml
>>> >> package.
>>> >> * We still accept bug fixes in the RDD-based API.
>>> >> * We will add more features to the DataFrame-based API in the 2.x
>>> series to
>>> >> reach feature parity with the RDD-based API.
>>> >> * Once we reach feature parity (possibly in Spark 2.2), we will
>>> deprecate
>>> >> the RDD-based API.
>>> >> * We will remove the RDD-based API from the main Spark repo in Spark
>>> 3.0.
>>> >>
>>> >> Though the RDD-based API is already in de facto maintenance mode, this
>>> >> announcement will make it clear and hence important to both MLlib
>>> developers
>>> >> and users. So we’d greatly appreciate your feedback!
>>> >>
>>> >> (As a side note, people sometimes use “Spark ML” to refer to the
>>> >> DataFrame-based API or even the entire MLlib component. This also
>>> causes
>>> >> confusion. To be clear, “Spark ML” is not an official name and there
>>> are no
>>> >> plans to rename MLlib to “Spark ML” at this time.)
>>> >>
>>> >> Best,
>>> >> Xiangrui
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Joseph Bradley
+1  By the way, the JIRA for tracking (Scala) API parity is:
https://issues.apache.org/jira/browse/SPARK-4591

On Tue, Apr 5, 2016 at 4:58 PM, Matei Zaharia 
wrote:

> This sounds good to me as well. The one thing we should pay attention to
> is how we update the docs so that people know to start with the spark.ml
> classes. Right now the docs list spark.mllib first and also seem more
> comprehensive in that area than in spark.ml, so maybe people naturally
> move towards that.
>
> Matei
>
> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
>
> Yes, DB (cc'ed) is working on porting the local linear algebra library
> over (SPARK-13944). There are also frequent pattern mining algorithms we
> need to port over in order to reach feature parity. -Xiangrui
>
> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
> shiva...@eecs.berkeley.edu> wrote:
>
>> Overall this sounds good to me. One question I have is that in
>> addition to the ML algorithms we have a number of linear algebra
>> (various distributed matrices) and statistical methods in the
>> spark.mllib package. Is the plan to port or move these to the spark.ml
>> namespace in the 2.x series ?
>>
>> Thanks
>> Shivaram
>>
>> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
>> > FWIW, all of that sounds like a good plan to me. Developing one API is
>> > certainly better than two.
>> >
>> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
>> >> Hi all,
>> >>
>> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
>> built
>> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
>> API has
>> >> been developed under the spark.ml package, while the old RDD-based
>> API has
>> >> been developed in parallel under the spark.mllib package. While it was
>> >> easier to implement and experiment with new APIs under a new package,
>> it
>> >> became harder and harder to maintain as both packages grew bigger and
>> >> bigger. And new users are often confused by having two sets of APIs
>> with
>> >> overlapped functions.
>> >>
>> >> We started to recommend the DataFrame-based API over the RDD-based API
>> in
>> >> Spark 1.5 for its versatility and flexibility, and we saw the
>> development
>> >> and the usage gradually shifting to the DataFrame-based API. Just
>> counting
>> >> the lines of Scala code, from 1.5 to the current master we added ~1
>> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So,
>> to
>> >> gather more resources on the development of the DataFrame-based API
>> and to
>> >> help users migrate over sooner, I want to propose switching RDD-based
>> MLlib
>> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>> >>
>> >> * We do not accept new features in the RDD-based spark.mllib package,
>> unless
>> >> they block implementing new features in the DataFrame-based spark.ml
>> >> package.
>> >> * We still accept bug fixes in the RDD-based API.
>> >> * We will add more features to the DataFrame-based API in the 2.x
>> series to
>> >> reach feature parity with the RDD-based API.
>> >> * Once we reach feature parity (possibly in Spark 2.2), we will
>> deprecate
>> >> the RDD-based API.
>> >> * We will remove the RDD-based API from the main Spark repo in Spark
>> 3.0.
>> >>
>> >> Though the RDD-based API is already in de facto maintenance mode, this
>> >> announcement will make it clear and hence important to both MLlib
>> developers
>> >> and users. So we’d greatly appreciate your feedback!
>> >>
>> >> (As a side note, people sometimes use “Spark ML” to refer to the
>> >> DataFrame-based API or even the entire MLlib component. This also
>> causes
>> >> confusion. To be clear, “Spark ML” is not an official name and there
>> are no
>> >> plans to rename MLlib to “Spark ML” at this time.)
>> >>
>> >> Best,
>> >> Xiangrui
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>
>


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Matei Zaharia
This sounds good to me as well. The one thing we should pay attention to is how 
we update the docs so that people know to start with the spark.ml classes. 
Right now the docs list spark.mllib first and also seem more comprehensive in 
that area than in spark.ml, so maybe people naturally move towards that.

Matei

> On Apr 5, 2016, at 4:44 PM, Xiangrui Meng  wrote:
> 
> Yes, DB (cc'ed) is working on porting the local linear algebra library over 
> (SPARK-13944). There are also frequent pattern mining algorithms we need to 
> port over in order to reach feature parity. -Xiangrui
> 
> On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman 
> > wrote:
> Overall this sounds good to me. One question I have is that in
> addition to the ML algorithms we have a number of linear algebra
> (various distributed matrices) and statistical methods in the
> spark.mllib package. Is the plan to port or move these to the spark.ml 
> 
> namespace in the 2.x series ?
> 
> Thanks
> Shivaram
> 
> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  > wrote:
> > FWIW, all of that sounds like a good plan to me. Developing one API is
> > certainly better than two.
> >
> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  > > wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API built
> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based API 
> >> has
> >> been developed under the spark.ml  package, while the 
> >> old RDD-based API has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package, it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API in
> >> Spark 1.5 for its versatility and flexibility, and we saw the development
> >> and the usage gradually shifting to the DataFrame-based API. Just counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
> >> gather more resources on the development of the DataFrame-based API and to
> >> help users migrate over sooner, I want to propose switching RDD-based MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package, 
> >> unless
> >> they block implementing new features in the DataFrame-based spark.ml 
> >> 
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib 
> >> developers
> >> and users. So we’d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use “Spark ML” to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also causes
> >> confusion. To be clear, “Spark ML” is not an official name and there are no
> >> plans to rename MLlib to “Spark ML” at this time.)
> >>
> >> Best,
> >> Xiangrui
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > 
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > 
> >



Re: I want to unsubscribe

2016-04-05 Thread Jakob Odersky
to unsubscribe, send an email to user-unsubscr...@spark.apache.org

On Tue, Apr 5, 2016 at 4:50 PM, Ranjana Rajendran
 wrote:
> I get to see the threads in the public mailing list. I don;t want so many
> messages in my inbox. I want to unsubscribe.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



I want to unsubscribe

2016-04-05 Thread Ranjana Rajendran
I get to see the threads in the public mailing list. I don;t want so many
messages in my inbox. I want to unsubscribe.


Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
Based on my experience


if you use ³simple² streaming. (I.E. You do not use windows) after every
mini batch you will ³save² This will cause a dir in hdfs with the timestamp
as part of the path. With in the dir, a separate part file will be created
for each partition. If you used windowing you could probably write several
mini batches at one time. (I have not used windows so I am speculating what
the behavior will be)

If you are running batch processing you could easily merge many small files
if needed.



From:  Mich Talebzadeh 
Date:  Tuesday, April 5, 2016 at 4:17 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
creating empty files on HDFS

> I agree every time an OS file is created, it requires a context switch plus a
> file descriptor. It is probably more time consuming to open and close these
> files than actually doing the work.
> 
> I always wondered about performance implication of Spark streaming and
> although there are some early days results. I have yet to see any concrete P
> on this. 
> 
> My issue is that I want to use Spark streaming with Complex Event Processing
> by developing adaptors (a combination of filters and mappers) to distinguish
> signal from pedestal in real terms and only Save data to persistent storage
> (HDFS) if they are of value.
> 
> I am using Kafka upstream and that does a good job. Now I am trying to
> experiment with saving data to HDFS in one form or shape. Basically this is
> just immutable data so the lesser partition the better. I am happy to store
> the data in text, parquet or (ORC format in Hive) as long as it works.
> 
> Regards
> 
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
>  8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com 
> 
>  
> 
> On 5 April 2016 at 23:59, Andy Davidson  wrote:
>> In my experience my streaming I was getting tens of thousands of empty files
>> created in HDFS. This was crushing my systems performance when my batch jobs
>> ran over the data sets. There is a lot of over head open and closing empty
>> files.
>> 
>> I think creating empty files or keeping empty partitions around is probably a
>> bug how ever I never filed a bug report. Please file a bug report. Please
>> copy me on the Jira
>> 
>> There is also a related performance issue. I use reparation() to ensure CSV
>> files have a max number of rows. (it an product requirement to make csv files
>> more user friendly). In my experience if I do not reparation a partitions
>> with a single row of data would cause a separate part-* file to be created. I
>> wound out with large number of very small files. I have always wonder how to
>> configure partitions to get better performance. I would think we are better
>> off with a few very large partitions in most cases. I.E. Keep more stuff in
>> memory with less overhead. I was really hoping Spark would automatically
>> handle this for me
>> 
>> Andy
>> 
>> From:  Mich Talebzadeh 
>> Date:  Tuesday, April 5, 2016 at 3:49 PM
>> To:  Andrew Davidson 
>> Cc:  "user @spark" 
>> Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
>> creating empty files on HDFS
>> 
>>> Thanks Andy.
>>> 
>>> Do we know if this is a known bug or simply a feature that on the face of it
>>> Spark cannot save RDD output to a text file?
>>> 
>>> 
>>> 
>>> Dr Mich Talebzadeh
>>> 
>>>  
>>> 
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
>>> V8Pw 
>>> >> rV8Pw> 
>>> 
>>>  
>>> 
>>> http://talebzadehmich.wordpress.com 
>>> 
>>>  
>>> 
>>> On 5 April 2016 at 23:35, Andy Davidson 
>>> wrote:
 Hi Mich
 
 Yup I was surprised to find empty files. Its easy to work around. Note I
 should probably use coalesce() and not repartition()
 
 In general I found I almost always need to reparation. I was getting
 thousands of empty partitions. It was really slowing my system down.
 
private static void save(JavaDStream json, String outputURIBase)
 {
 
 /*
 
 using saveAsTestFiles will cause lots of empty directories to be
 created.
 
 DStream data = json.dstream();
 
 data.saveAsTextFiles(outputURI, null);
 
 */
 
 
 
 jsonTweets.foreachRDD(new VoidFunction2() {
 
 private static final long serialVersionUID = 1L;
 

Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Mich Talebzadeh
I agree every time an OS file is created, it requires a context switch plus
a file descriptor. It is probably more time consuming to open and close
these files than actually doing the work.

I always wondered about performance implication of Spark streaming and
although there are some early days results. I have yet to see any concrete
P on this.

My issue is that I want to use Spark streaming with Complex Event
Processing by developing adaptors (a combination of filters and mappers) to
distinguish signal from pedestal in real terms and only Save data to
persistent storage (HDFS) if they are of value.

I am using Kafka upstream and that does a good job. Now I am trying to
experiment with saving data to HDFS in one form or shape. Basically this is
just immutable data so the lesser partition the better. I am happy to store
the data in text, parquet or (ORC format in Hive) as long as it works.

Regards


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 23:59, Andy Davidson 
wrote:

> In my experience my streaming I was getting tens of thousands of empty
> files created in HDFS. This was crushing my systems performance when my
> batch jobs ran over the data sets. There is a lot of over head open and
> closing empty files.
>
> I think creating empty files or keeping empty partitions around is
> probably a bug how ever I never filed a bug report. Please file a bug
> report. Please copy me on the Jira
>
> There is also a related performance issue. I use reparation() to ensure
> CSV files have a max number of rows. (it an product requirement to make csv
> files more user friendly). In my experience if I do not reparation
> a partitions with a single row of data would cause a separate part-* file
> to be created. I wound out with large number of very small files. I have
> always wonder how to configure partitions to get better performance. I
> would think we are better off with a few very large partitions in most
> cases. I.E. Keep more stuff in memory with less overhead. I was really
> hoping Spark would automatically handle this for me
>
> Andy
>
> From: Mich Talebzadeh 
> Date: Tuesday, April 5, 2016 at 3:49 PM
> To: Andrew Davidson 
> Cc: "user @spark" 
> Subject: Re: Saving Spark streaming RDD with saveAsTextFiles ends up
> creating empty files on HDFS
>
> Thanks Andy.
>
> Do we know if this is a known bug or simply a feature that on the face of
> it Spark cannot save RDD output to a text file?
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 23:35, Andy Davidson 
> wrote:
>
>> Hi Mich
>>
>> Yup I was surprised to find empty files. Its easy to work around. Note I
>> should probably use coalesce() and not repartition()
>>
>> In general I found I almost always need to reparation. I was getting
>> thousands of empty partitions. It was really slowing my system down.
>>
>>private static void save(JavaDStream json, String
>> outputURIBase) {
>>
>> /*
>>
>> using saveAsTestFiles will cause lots of empty directories to be
>> created.
>>
>> DStream data = json.dstream();
>>
>> data.saveAsTextFiles(outputURI, null);
>>
>> */
>>
>>
>>
>> jsonTweets.foreachRDD(new VoidFunction2()
>> {
>>
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>>
>> public void call(JavaRDD rdd, Time time) throws
>> Exception {
>>
>> Long count = rdd.count();
>>
>> //if(!rdd.isEmpty()) {
>>
>> if(count > 0) {
>>
>> rdd = repartition(rdd, count.intValue());
>>
>> long milliSeconds = time.milliseconds();
>>
>> String date = Utils.convertMillisecondsToDateStr(
>> milliSeconds);
>>
>> String dirPath = outputURIBase
>>
>> + File.separator +  date
>>
>> + File.separator + "tweet-" + time
>> .milliseconds();
>>
>> rdd.saveAsTextFile(dirPath);
>>
>> }
>>
>> }
>>
>>
>>
>> final int maxNumRowsPerFile = 200;
>>
>> JavaRDD repartition(JavaRDD rdd, int count) {
>>
>>
>> long numPartisions = count / maxNumRowsPerFile + 1;
>>
>> Long tmp = numPartisions;
>>
>> rdd = rdd.repartition(tmp.intValue());
>>
>> return rdd;
>>
>> }
>>
>>   

Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
In my experience my streaming I was getting tens of thousands of empty files
created in HDFS. This was crushing my systems performance when my batch jobs
ran over the data sets. There is a lot of over head open and closing empty
files.

I think creating empty files or keeping empty partitions around is probably
a bug how ever I never filed a bug report. Please file a bug report. Please
copy me on the Jira

There is also a related performance issue. I use reparation() to ensure CSV
files have a max number of rows. (it an product requirement to make csv
files more user friendly). In my experience if I do not reparation a
partitions with a single row of data would cause a separate part-* file to
be created. I wound out with large number of very small files. I have always
wonder how to configure partitions to get better performance. I would think
we are better off with a few very large partitions in most cases. I.E. Keep
more stuff in memory with less overhead. I was really hoping Spark would
automatically handle this for me

Andy

From:  Mich Talebzadeh 
Date:  Tuesday, April 5, 2016 at 3:49 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: Saving Spark streaming RDD with saveAsTextFiles ends up
creating empty files on HDFS

> Thanks Andy.
> 
> Do we know if this is a known bug or simply a feature that on the face of it
> Spark cannot save RDD output to a text file?
> 
> 
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
>  8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com 
> 
>  
> 
> On 5 April 2016 at 23:35, Andy Davidson  wrote:
>> Hi Mich
>> 
>> Yup I was surprised to find empty files. Its easy to work around. Note I
>> should probably use coalesce() and not repartition()
>> 
>> In general I found I almost always need to reparation. I was getting
>> thousands of empty partitions. It was really slowing my system down.
>> 
>>private static void save(JavaDStream json, String outputURIBase) {
>> 
>> /*
>> 
>> using saveAsTestFiles will cause lots of empty directories to be
>> created.
>> 
>> DStream data = json.dstream();
>> 
>> data.saveAsTextFiles(outputURI, null);
>> 
>> */  
>> 
>> 
>> 
>> jsonTweets.foreachRDD(new VoidFunction2() {
>> 
>> private static final long serialVersionUID = 1L;
>> 
>> @Override
>> 
>> public void call(JavaRDD rdd, Time time) throws Exception
>> {
>> 
>> Long count = rdd.count();
>> 
>> //if(!rdd.isEmpty()) {
>> 
>> if(count > 0) {
>> 
>> rdd = repartition(rdd, count.intValue());
>> 
>> long milliSeconds = time.milliseconds();
>> 
>> String date =
>> Utils.convertMillisecondsToDateStr(milliSeconds);
>> 
>> String dirPath = outputURIBase
>> 
>> + File.separator +  date
>> 
>> + File.separator + "tweet-" +
>> time.milliseconds();
>> 
>> rdd.saveAsTextFile(dirPath);
>> 
>> }
>> 
>> }
>> 
>> 
>> 
>> final int maxNumRowsPerFile = 200;
>> 
>> JavaRDD repartition(JavaRDD rdd, int count) {
>> 
>> long numPartisions = count / maxNumRowsPerFile + 1;
>> 
>> Long tmp = numPartisions;
>> 
>> rdd = rdd.repartition(tmp.intValue());
>> 
>> return rdd;
>> 
>> }
>> 
>> });
>> 
>>  
>> 
>> }
>> 
>> 
>> 
>> 
>> From:  Mich Talebzadeh 
>> Date:  Tuesday, April 5, 2016 at 3:06 PM
>> To:  "user @spark" 
>> Subject:  Saving Spark streaming RDD with saveAsTextFiles ends up creating
>> empty files on HDFS
>> 
>>> Spark 1.6.1
>>> 
>>> The following creates empty files. It prints lines OK with println
>>> 
>>> val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
>>> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
>>> result.saveAsTextFiles("/tmp/rdd_stuff")
>>> 
>>> I am getting zero length files
>>> 
>>> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:19
>>> /tmp/rdd_stuff-1459894755000
>>> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:20
>>> /tmp/rdd_stuff-145989481
>>> 
>>> Any ideas?
>>> 
>>> Thanks,
>>> 
>>> Dr Mich Talebzadeh
>>> 
>>>  
>>> 
>>> LinkedIn  
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUr
>>> V8Pw 
>>> >> rV8Pw> 
>>> 
>>>  
>>> 
>>> http://talebzadehmich.wordpress.com 

Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Mich Talebzadeh
Thanks Andy.

Do we know if this is a known bug or simply a feature that on the face of
it Spark cannot save RDD output to a text file?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 23:35, Andy Davidson 
wrote:

> Hi Mich
>
> Yup I was surprised to find empty files. Its easy to work around. Note I
> should probably use coalesce() and not repartition()
>
> In general I found I almost always need to reparation. I was getting
> thousands of empty partitions. It was really slowing my system down.
>
>private static void save(JavaDStream json, String outputURIBase)
> {
>
> /*
>
> using saveAsTestFiles will cause lots of empty directories to be
> created.
>
> DStream data = json.dstream();
>
> data.saveAsTextFiles(outputURI, null);
>
> */
>
>
>
> jsonTweets.foreachRDD(new VoidFunction2() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
>
> public void call(JavaRDD rdd, Time time) throws
> Exception {
>
> Long count = rdd.count();
>
> //if(!rdd.isEmpty()) {
>
> if(count > 0) {
>
> rdd = repartition(rdd, count.intValue());
>
> long milliSeconds = time.milliseconds();
>
> String date = Utils.convertMillisecondsToDateStr(
> milliSeconds);
>
> String dirPath = outputURIBase
>
> + File.separator +  date
>
> + File.separator + "tweet-" + time
> .milliseconds();
>
> rdd.saveAsTextFile(dirPath);
>
> }
>
> }
>
>
>
> final int maxNumRowsPerFile = 200;
>
> JavaRDD repartition(JavaRDD rdd, int count) {
>
>
> long numPartisions = count / maxNumRowsPerFile + 1;
>
> Long tmp = numPartisions;
>
> rdd = rdd.repartition(tmp.intValue());
>
> return rdd;
>
> }
>
> });
>
>
>
> }
>
>
>
> From: Mich Talebzadeh 
> Date: Tuesday, April 5, 2016 at 3:06 PM
> To: "user @spark" 
> Subject: Saving Spark streaming RDD with saveAsTextFiles ends up creating
> empty files on HDFS
>
> Spark 1.6.1
>
> The following creates empty files. It prints lines OK with println
>
> val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> result.saveAsTextFiles("/tmp/rdd_stuff")
>
> I am getting zero length files
>
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:19
> /tmp/rdd_stuff-1459894755000
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:20
> /tmp/rdd_stuff-145989481
>
> Any ideas?
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>


lost executor due to large shuffle spill memory

2016-04-05 Thread lllll
I have a task to remap the index to actual uuid in ALS prediction results.
But it consistently fail due to lost executors. I noticed there's large
shuffle spill memory but I don't know how to improve it. 

 

I've tried to reduce the number of executors while assigning each to have
bigger memory. 
 

But it still doesn't seem big enough. I don't know what to do. 

Below is my code:
user = load_user()
product = load_product()
user.cache()
product.cache()
model = load_model(model_path)
all_pairs = user.map(lambda x: x[1]).cartesian(product.map(lambda x: x[1]))
all_prediction = model.predictAll(all_pairs)
user_reverse = user.map(lambda r: (r[1], r[0]))
product_reverse = product.map(lambda r: (r[1], r[0]))
user_reversed = all_prediction.map(lambda u: (u[0], (u[1],
u[2]))).join(user_reverse).map(lambda r: (r[1][0][0], (r[1][1],
r[1][0][1])))
both_reversed = user_reversed.join(product_reverse).map(lambda r:
(r[1][0][0], r[1][1], r[1][0][1]))
both_reversed.map(lambda x: '{}|{}|{}'.format(x[0], x[1],
x[2])).saveAsTextFile(recommendation_path)

Both user and products are (uuid, index) tuples. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/lost-executor-due-to-large-shuffle-spill-memory-tp26683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Andy Davidson
Hi Mich

Yup I was surprised to find empty files. Its easy to work around. Note I
should probably use coalesce() and not repartition()

In general I found I almost always need to reparation. I was getting
thousands of empty partitions. It was really slowing my system down.

   private static void save(JavaDStream json, String outputURIBase)
{

/*

using saveAsTestFiles will cause lots of empty directories to be
created.

DStream data = json.dstream();

data.saveAsTextFiles(outputURI, null);

*/



jsonTweets.foreachRDD(new VoidFunction2() {

private static final long serialVersionUID = 1L;

@Override

public void call(JavaRDD rdd, Time time) throws
Exception {

Long count = rdd.count();

//if(!rdd.isEmpty()) {

if(count > 0) {

rdd = repartition(rdd, count.intValue());

long milliSeconds = time.milliseconds();

String date =
Utils.convertMillisecondsToDateStr(milliSeconds);

String dirPath = outputURIBase

+ File.separator +  date

+ File.separator + "tweet-" +
time.milliseconds();

rdd.saveAsTextFile(dirPath);

}  

}



final int maxNumRowsPerFile = 200;

JavaRDD repartition(JavaRDD rdd, int count) {

long numPartisions = count / maxNumRowsPerFile + 1;

Long tmp = numPartisions;

rdd = rdd.repartition(tmp.intValue());

return rdd;

}

});

 

}




From:  Mich Talebzadeh 
Date:  Tuesday, April 5, 2016 at 3:06 PM
To:  "user @spark" 
Subject:  Saving Spark streaming RDD with saveAsTextFiles ends up creating
empty files on HDFS

> Spark 1.6.1
> 
> The following creates empty files. It prints lines OK with println
> 
> val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> result.saveAsTextFiles("/tmp/rdd_stuff")
> 
> I am getting zero length files
> 
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:19
> /tmp/rdd_stuff-1459894755000
> drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:20
> /tmp/rdd_stuff-145989481
> 
> Any ideas?
> 
> Thanks,
> 
> Dr Mich Talebzadeh
> 
>  
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8
> Pw 
>  8Pw> 
> 
>  
> 
> http://talebzadehmich.wordpress.com 
> 
>  




Saving Spark streaming RDD with saveAsTextFiles ends up creating empty files on HDFS

2016-04-05 Thread Mich Talebzadeh
Spark 1.6.1

The following creates empty files. It prints lines OK with println

val result = lines.filter(_.contains("ASE 15")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
result.saveAsTextFiles("/tmp/rdd_stuff")

I am getting zero length files

drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:19
/tmp/rdd_stuff-1459894755000
drwxr-xr-x   - hduser supergroup  0 2016-04-05 23:20
/tmp/rdd_stuff-145989481

Any ideas?

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: Spark error with checkpointing

2016-04-05 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-and-broadcast-variables

On Tue, Apr 5, 2016 at 3:51 PM, Akhilesh Pathodia
 wrote:
> Hi,
>
> I am running spark jobs on yarn in cluster mode. The job reads the messages
> from kafka direct stream. I am using broadcast variables and checkpointing
> every 30 seconds. When I start the job first time it runs fine without any
> issue. If I kill the job and restart it throws below exception in executor
> upon receiving a message from kafka:
>
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_1_piece0 of broadcast_1
>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
>   at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>   at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>   at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>   at
> net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:177)
>   at
> net.juniper.spark.stream.LogDataStreamProcessor$2.call(LogDataStreamProcessor.java:1)
>   at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
>   at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:172)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>   at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>   at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>   at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>   at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>   at
> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
>   at
> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1298)
>   at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>   at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> If I delete the checkpoint directory hand restart the job, it runs without
> any issue.
> Does anyone have idea how to resolve this error?
>
> Spark version: 1.5.0
> CDH 5.5.1
>
> Thanks,
> AKhilesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
If I go through each RDD  I get

val result = lines.filter(_.contains("Sending messages")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
scala> result.foreachRDD( rdd => {
 | for(item <- rdd.collect().toArray) {
 | println(item);
 | }
 | })

Rather than println(items), I want to store the data temporarily and then
flush the results to an HDFS file as an append

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 14:02, Mich Talebzadeh  wrote:

> This is the idea I have in mind
>
> I want to go through every line
>
> result.foreachRDD(rdd => rdd.foreach(println))
>
> rather than print each line I want to save them temporarily and then
> add/append the result set (lines in RDD ) to a table for further analyses.
> It could be a Parquet or Hive table.
>
> So only interested in lines of interest that saves me space on the disk.
>
> I am aware that I can simply write them to text files
>
> result.saveAsTextFiles("/user/hduser/tmp/keep")
>
>
> Any ideas will be appreciated
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 09:32, Sachin Aggarwal 
> wrote:
>
>> sure,
>>
>> this will be help full try this
>>
>>
>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html
>>
>> On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Sachin. Will test it
>>>
>>> I guess I can modify it to save the output to a Hive table as opposed to
>>> terminal
>>>
>>> Regards
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 5 April 2016 at 09:06, Sachin Aggarwal 
>>> wrote:
>>>
 Hey ,

 I have changed your example itself try this , it should work in
 terminal

 val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
 INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
 1)).reduceByKey(_ + _)
 result.foreachRDD(rdd => rdd.foreach(println))


 On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Thanks.
>
> Currently this is what I am doing
>
> // Get the lines
> //
> val lines = messages.map(_._2)
> // Check for message
> val showResults = lines.filter(_.contains("Sending
> messages")).flatMap(line => line.split("\n,")).map(word => (word,
> 1)).reduceByKey(_ + _).print(1000)
>
> So it prints max of 1000 lines to terminal after filter and map. Can
> this be done as suggested?
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 06:53, Sachin Aggarwal 
> wrote:
>
>> Hi
>>
>> Instead of using print() directly on Dstream, I will suggest you use 
>> foreachRDD
>> if you  wanted to materialize all rows , example shown here:-
>>
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>
>> dstream.foreachRDD(rdd => {
>>   val connection = createNewConnection()  // executed at the driver
>>   rdd.foreach(record => {
>>   connection.send(record) // executed at the worker
>>   })
>>   })
>>
>>
>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>> However, print(1000) does
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 19:46, Ted Yu  wrote:
>>>
 

RE: Partition pruning in spark 1.5.2

2016-04-05 Thread Yong Zhang
Hi, Michael:
I would like to ask the same question, if the DF hash partitioned, then cache, 
now query/filter by the column which hashed for partition, will Spark be smart 
enough to do the Partition pruning in this case, instead of depending on 
Parquet's partition pruning. I think that is the original question.
Thanks
Yong

From: mich...@databricks.com
Date: Tue, 5 Apr 2016 13:28:46 -0700
Subject: Re: Partition pruning in spark 1.5.2
To: darshan.m...@gmail.com
CC: user@spark.apache.org

The following should ensure partition pruning happens:
df.write.partitionBy("country").save("/path/to/data")sqlContext.read.load("/path/to/data").where("country
 = 'UK'")
On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh  wrote:
Thanks for the reply.
Now I saved the part_movies as parquet file.
Then created new dataframe from the saved parquet file and I did not persist 
it. The i ran the same query. It still read all 20 partitions and this time 
from hdfs.
So what will be exact scenario when it will prune partitions. I am bit confused 
now. Isnt there a way to see the exact partition pruning?
Thanks
On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust  wrote:
For the in-memory cache, we still launch tasks, we just skip blocks when 
possible using statistics about those blocks.
On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh  wrote:
Thanks. It is not my exact scenario but I have tried to reproduce it. I have 
used 1.5.2.
I have a part-movies data-frame which has 20 partitions 1 each for a movie.
I created following query

val part_sql = sqlContext.sql("select * from part_movies where movie = 
10")part_sql.count()
I expect that this should just read from 1 partition i.e. partition 10. Other 
partitions it should max read metadata and not the data.
here is physical plan. I could see the filter. From here i can not say whether 
this filter is causing any partition pruning. If actually pruning is happening 
i would like to see a operator which mentions the same.== Physical Plan ==
TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
output=[count#75L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[], 
functions=[(count(1),mode=Partial,isDistinct=false)], output=[currentCount#93L])
   Project
Filter (movie#33 = 10)
 InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], (InMemoryRelation 
[movie#33,title#34,genres#35], true, 1, StorageLevel(true, true, false, 
true, 1), (Scan PhysicalRDD[movie#33,title#34,genres#35]), None)
However, my assumption that partition is not pruned is not based on the above 
plan but when I look at the job and its stages. I could see that it has read 
full data of the dataframe.  I should see around 65KB as that is almost average 
size of each partition.
Aggregated Metrics by Executor

 
 
  Executor ID 
  Address 
  Task Time 
  Total Tasks 
  Failed Tasks 
  Succeeded Tasks 
   Input Size / Records  
Shuffle Write Size / Records  
 
 
 
  
  driver 
  localhost:53247 
  0.4 s 
  20 
  0 
  20 
   1289.0 KB / 20  
   840.0 B / 20 


Task details only first 7. Here I expect that except 1 task(which access the 
partitions data) all others should be either 0 KB or just the size of metadata 
after which it discarded that partition as its data was not needed. But i could 
see that all the partitions are read.
This is small example so it doesnt make diff but for a large dataframe reading 
all the data even that in memory takes time.
Tasks


 
 
  
   


















  
  
  
   
   0 
   27 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   39 ms 
12 ms  
9 ms  
 
0 ms  
0 ms  
0.0 B  
   66.2 KB (memory) / 1 
   
   42.0 B / 1 
 
  
   
   1 
   28 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   41 ms 
9 ms  
7 ms  
 
0 ms  
0 ms  
0.0 B  
   63.9 KB (memory) / 1 
   1 ms
   42.0 B / 1 
 
  
   
   2 
   29 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   40 ms 
7 ms  
7 ms  
 
0 ms  
0 ms  
0.0 B  
   65.9 KB (memory) / 1 
   1 ms
   42.0 B / 1 
 
  
   
   3 
   30 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   driver / localhost 
   2016/04/05 19:01:03 
   6 ms 
3 ms  
5 ms  
 
0 ms  
0 ms  
0.0 B  
   62.0 KB (memory) / 1 
   
   42.0 B / 1 
 
  
   
   4 
   31 
   0 
   SUCCESS 
   PROCESS_LOCAL 
   

Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Xiangrui Meng
Yes, DB (cc'ed) is working on porting the local linear algebra library over
(SPARK-13944). There are also frequent pattern mining algorithms we need to
port over in order to reach feature parity. -Xiangrui

On Tue, Apr 5, 2016 at 12:08 PM Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> Overall this sounds good to me. One question I have is that in
> addition to the ML algorithms we have a number of linear algebra
> (various distributed matrices) and statistical methods in the
> spark.mllib package. Is the plan to port or move these to the spark.ml
> namespace in the 2.x series ?
>
> Thanks
> Shivaram
>
> On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
> > FWIW, all of that sounds like a good plan to me. Developing one API is
> > certainly better than two.
> >
> > On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
> >> Hi all,
> >>
> >> More than a year ago, in Spark 1.2 we introduced the ML pipeline API
> built
> >> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based
> API has
> >> been developed under the spark.ml package, while the old RDD-based API
> has
> >> been developed in parallel under the spark.mllib package. While it was
> >> easier to implement and experiment with new APIs under a new package, it
> >> became harder and harder to maintain as both packages grew bigger and
> >> bigger. And new users are often confused by having two sets of APIs with
> >> overlapped functions.
> >>
> >> We started to recommend the DataFrame-based API over the RDD-based API
> in
> >> Spark 1.5 for its versatility and flexibility, and we saw the
> development
> >> and the usage gradually shifting to the DataFrame-based API. Just
> counting
> >> the lines of Scala code, from 1.5 to the current master we added ~1
> >> lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
> >> gather more resources on the development of the DataFrame-based API and
> to
> >> help users migrate over sooner, I want to propose switching RDD-based
> MLlib
> >> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
> >>
> >> * We do not accept new features in the RDD-based spark.mllib package,
> unless
> >> they block implementing new features in the DataFrame-based spark.ml
> >> package.
> >> * We still accept bug fixes in the RDD-based API.
> >> * We will add more features to the DataFrame-based API in the 2.x
> series to
> >> reach feature parity with the RDD-based API.
> >> * Once we reach feature parity (possibly in Spark 2.2), we will
> deprecate
> >> the RDD-based API.
> >> * We will remove the RDD-based API from the main Spark repo in Spark
> 3.0.
> >>
> >> Though the RDD-based API is already in de facto maintenance mode, this
> >> announcement will make it clear and hence important to both MLlib
> developers
> >> and users. So we’d greatly appreciate your feedback!
> >>
> >> (As a side note, people sometimes use “Spark ML” to refer to the
> >> DataFrame-based API or even the entire MLlib component. This also causes
> >> confusion. To be clear, “Spark ML” is not an official name and there
> are no
> >> plans to rename MLlib to “Spark ML” at this time.)
> >>
> >> Best,
> >> Xiangrui
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks a lot. I will try this one  as well.

On Tue, Apr 5, 2016 at 9:28 PM, Michael Armbrust 
wrote:

> The following should ensure partition pruning happens:
>
> df.write.partitionBy("country").save("/path/to/data")
> sqlContext.read.load("/path/to/data").where("country = 'UK'")
>
> On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh 
> wrote:
>
>> Thanks for the reply.
>>
>> Now I saved the part_movies as parquet file.
>>
>> Then created new dataframe from the saved parquet file and I did not
>> persist it. The i ran the same query. It still read all 20 partitions and
>> this time from hdfs.
>>
>> So what will be exact scenario when it will prune partitions. I am bit
>> confused now. Isnt there a way to see the exact partition pruning?
>>
>> Thanks
>>
>> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
>> wrote:
>>
>>> For the in-memory cache, we still launch tasks, we just skip blocks when
>>> possible using statistics about those blocks.
>>>
>>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
>>> wrote:
>>>
 Thanks. It is not my exact scenario but I have tried to reproduce it. I
 have used 1.5.2.

 I have a part-movies data-frame which has 20 partitions 1 each for a
 movie.

 I created following query


 val part_sql = sqlContext.sql("select * from part_movies where movie =
 10")
 part_sql.count()

 I expect that this should just read from 1 partition i.e. partition 10.
 Other partitions it should max read metadata and not the data.

 here is physical plan. I could see the filter. From here i can not say
 whether this filter is causing any partition pruning. If actually pruning
 is happening i would like to see a operator which mentions the same.

 == Physical Plan ==
 TungstenAggregate(key=[], 
 functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
  TungstenExchange SinglePartition
   TungstenAggregate(key=[], 
 functions=[(count(1),mode=Partial,isDistinct=false)], 
 output=[currentCount#93L])
Project
 Filter (movie#33 = 10)
  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
 (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
 StorageLevel(true, true, false, true, 1), (Scan 
 PhysicalRDD[movie#33,title#34,genres#35]), None)


 However, my assumption that partition is not pruned is not based on the
 above plan but when I look at the job and its stages. I could see that it
 has read full data of the dataframe.  I should see around 65KB as that is
 almost average size of each partition.

 Aggregated Metrics by Executor
 Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks 
 Input
 Size / Records Shuffle Write Size / Records
 driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20


 Task details only first 7. Here I expect that except 1 task(which
 access the partitions data) all others should be either 0 KB or just the
 size of metadata after which it discarded that partition as its data was
 not needed. But i could see that all the partitions are read.

 This is small example so it doesnt make diff but for a large dataframe
 reading all the data even that in memory takes time.

 Tasks


















 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
 ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
 ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
 ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6
 ms 3 ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
 ms 4 ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
 ms 2 ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5
 ms 3 ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4
 ms 5 ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1

 Let me know if you need anything else.

 Thanks




 On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> Can you show your full code.  How are you partitioning the data? How
> are you reading it?  What is the resulting query plan (run 

Re: Stress testing hdfs with Spark

2016-04-05 Thread Jan Holmberg
Yes, I realize that there's a standard way and then there's the way where 
client asks 'how fast can it write the data'. That is what I'm trying to figure 
out. At the moment I'm far from disks teorethical write speed when combining 
all the disks together.

On 05 Apr 2016, at 23:21, Mich Talebzadeh 
> wrote:

so that throughput per second. You can try Spark streaming saving it to HDFS 
and increase the throttle.

The general accepted form is to measure service time which is the average 
service time for IO requests in ms


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:56, Jan Holmberg 
> wrote:
I'm trying to get rough estimate how much data I can write within certain time 
period (GB/sec).
-jan

On 05 Apr 2016, at 22:49, Mich Talebzadeh 
> wrote:

Hi Jan,

What is the definition of stress test in here? What are the matrices? 
Throughput of data, latency, velocity, volume?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:42, Jan Holmberg 
> wrote:
Hi,
I'm trying to figure out how to write lots of data from each worker. I tried 
rdd.saveAsTextFile but got OOM when generating 1024MB string for a worker. 
Increasing worker memory would mean that I should drop the number of workers.
Soo, any idea how to write ex. 1gb file from each worker?

cheers,
-jan
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org





Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
The following should ensure partition pruning happens:

df.write.partitionBy("country").save("/path/to/data")
sqlContext.read.load("/path/to/data").where("country = 'UK'")

On Tue, Apr 5, 2016 at 1:13 PM, Darshan Singh 
wrote:

> Thanks for the reply.
>
> Now I saved the part_movies as parquet file.
>
> Then created new dataframe from the saved parquet file and I did not
> persist it. The i ran the same query. It still read all 20 partitions and
> this time from hdfs.
>
> So what will be exact scenario when it will prune partitions. I am bit
> confused now. Isnt there a way to see the exact partition pruning?
>
> Thanks
>
> On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
> wrote:
>
>> For the in-memory cache, we still launch tasks, we just skip blocks when
>> possible using statistics about those blocks.
>>
>> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
>> wrote:
>>
>>> Thanks. It is not my exact scenario but I have tried to reproduce it. I
>>> have used 1.5.2.
>>>
>>> I have a part-movies data-frame which has 20 partitions 1 each for a
>>> movie.
>>>
>>> I created following query
>>>
>>>
>>> val part_sql = sqlContext.sql("select * from part_movies where movie =
>>> 10")
>>> part_sql.count()
>>>
>>> I expect that this should just read from 1 partition i.e. partition 10.
>>> Other partitions it should max read metadata and not the data.
>>>
>>> here is physical plan. I could see the filter. From here i can not say
>>> whether this filter is causing any partition pruning. If actually pruning
>>> is happening i would like to see a operator which mentions the same.
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>>>  TungstenExchange SinglePartition
>>>   TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>>> output=[currentCount#93L])
>>>Project
>>> Filter (movie#33 = 10)
>>>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
>>> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
>>> StorageLevel(true, true, false, true, 1), (Scan 
>>> PhysicalRDD[movie#33,title#34,genres#35]), None)
>>>
>>>
>>> However, my assumption that partition is not pruned is not based on the
>>> above plan but when I look at the job and its stages. I could see that it
>>> has read full data of the dataframe.  I should see around 65KB as that is
>>> almost average size of each partition.
>>>
>>> Aggregated Metrics by Executor
>>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
>>> Size / Records Shuffle Write Size / Records
>>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>>>
>>>
>>> Task details only first 7. Here I expect that except 1 task(which access
>>> the partitions data) all others should be either 0 KB or just the size of
>>> metadata after which it discarded that partition as its data was not
>>> needed. But i could see that all the partitions are read.
>>>
>>> This is small example so it doesnt make diff but for a large dataframe
>>> reading all the data even that in memory takes time.
>>>
>>> Tasks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39
>>> ms 12 ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
>>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41
>>> ms 9 ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
>>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40
>>> ms 7 ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
>>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
>>> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
>>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
>>> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
>>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
>>> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
>>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
>>> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
>>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
>>> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>>>
>>> Let me know if you need anything else.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust >> > wrote:
>>>
 Can you show your full code.  How are you partitioning the data? How
 are you reading it?  What is the resulting query plan (run explain() or
 EXPLAIN).

 On Tue, Apr 5, 2016 at 10:02 AM, dsing001 
 wrote:

> HI,
>
> I am using 1.5.2. I have a dataframe which is partitioned based on the
> country. So I have around 150 partition in the 

Re: Stress testing hdfs with Spark

2016-04-05 Thread Jan Holmberg
Yep,
I used dfsio and also Teragen but I would like to experiment with ad-hoc Spark 
prog.
-jan

On 05 Apr 2016, at 23:13, Sebastian Piu 
> wrote:


You could they using TestDFSIO for raw hdfs performance, but we found it not 
very relevant

Another way could be to either generate a file and then read it and write it 
back. For some of our use cases we are populated a Kafka queue on the cluster 
(on different disks) and used spark streaming to do a simple transformation and 
write back.

You can use graphite+grafana for the IO monitoring

On Tue, 5 Apr 2016, 20:56 Jan Holmberg, 
> wrote:
I'm trying to get rough estimate how much data I can write within certain time 
period (GB/sec).
-jan

On 05 Apr 2016, at 22:49, Mich Talebzadeh 
> wrote:

Hi Jan,

What is the definition of stress test in here? What are the matrices? 
Throughput of data, latency, velocity, volume?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:42, Jan Holmberg 
> wrote:
Hi,
I'm trying to figure out how to write lots of data from each worker. I tried 
rdd.saveAsTextFile but got OOM when generating 1024MB string for a worker. 
Increasing worker memory would mean that I should drop the number of workers.
Soo, any idea how to write ex. 1gb file from each worker?

cheers,
-jan
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Stress testing hdfs with Spark

2016-04-05 Thread Mich Talebzadeh
so that throughput per second. You can try Spark streaming saving it to
HDFS and increase the throttle.

The general accepted form is to measure service time which is the average
service time for IO requests in ms

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:56, Jan Holmberg  wrote:

> I'm trying to get rough estimate how much data I can write within certain
> time period (GB/sec).
> -jan
>
> On 05 Apr 2016, at 22:49, Mich Talebzadeh 
> wrote:
>
> Hi Jan,
>
> What is the definition of stress test in here? What are the matrices?
> Throughput of data, latency, velocity, volume?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 20:42, Jan Holmberg  wrote:
>
>> Hi,
>> I'm trying to figure out how to write lots of data from each worker. I
>> tried rdd.saveAsTextFile but got OOM when generating 1024MB string for a
>> worker. Increasing worker memory would mean that I should drop the number
>> of workers.
>> Soo, any idea how to write ex. 1gb file from each worker?
>>
>> cheers,
>> -jan
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks for the reply.

Now I saved the part_movies as parquet file.

Then created new dataframe from the saved parquet file and I did not
persist it. The i ran the same query. It still read all 20 partitions and
this time from hdfs.

So what will be exact scenario when it will prune partitions. I am bit
confused now. Isnt there a way to see the exact partition pruning?

Thanks

On Tue, Apr 5, 2016 at 8:59 PM, Michael Armbrust 
wrote:

> For the in-memory cache, we still launch tasks, we just skip blocks when
> possible using statistics about those blocks.
>
> On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
> wrote:
>
>> Thanks. It is not my exact scenario but I have tried to reproduce it. I
>> have used 1.5.2.
>>
>> I have a part-movies data-frame which has 20 partitions 1 each for a
>> movie.
>>
>> I created following query
>>
>>
>> val part_sql = sqlContext.sql("select * from part_movies where movie =
>> 10")
>> part_sql.count()
>>
>> I expect that this should just read from 1 partition i.e. partition 10.
>> Other partitions it should max read metadata and not the data.
>>
>> here is physical plan. I could see the filter. From here i can not say
>> whether this filter is causing any partition pruning. If actually pruning
>> is happening i would like to see a operator which mentions the same.
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[count#75L])
>>  TungstenExchange SinglePartition
>>   TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], 
>> output=[currentCount#93L])
>>Project
>> Filter (movie#33 = 10)
>>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
>> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
>> StorageLevel(true, true, false, true, 1), (Scan 
>> PhysicalRDD[movie#33,title#34,genres#35]), None)
>>
>>
>> However, my assumption that partition is not pruned is not based on the
>> above plan but when I look at the job and its stages. I could see that it
>> has read full data of the dataframe.  I should see around 65KB as that is
>> almost average size of each partition.
>>
>> Aggregated Metrics by Executor
>> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
>> Size / Records Shuffle Write Size / Records
>> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>>
>>
>> Task details only first 7. Here I expect that except 1 task(which access
>> the partitions data) all others should be either 0 KB or just the size of
>> metadata after which it discarded that partition as its data was not
>> needed. But i could see that all the partitions are read.
>>
>> This is small example so it doesnt make diff but for a large dataframe
>> reading all the data even that in memory takes time.
>>
>> Tasks
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
>> ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
>> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
>> ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
>> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
>> ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
>> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
>> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
>> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
>> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
>> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
>> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
>> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
>> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
>> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
>> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>>
>> Let me know if you need anything else.
>>
>> Thanks
>>
>>
>>
>>
>> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
>> wrote:
>>
>>> Can you show your full code.  How are you partitioning the data? How are
>>> you reading it?  What is the resulting query plan (run explain() or
>>> EXPLAIN).
>>>
>>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001 
>>> wrote:
>>>
 HI,

 I am using 1.5.2. I have a dataframe which is partitioned based on the
 country. So I have around 150 partition in the dataframe. When I run
 sparksql and use country = 'UK' it still reads all partitions and not
 able
 to prune other partitions. Thus all the queries run for similar times
 independent of what country I pass. Is it desired?

 Is there a way to fix this in 1.5.2 by using some parameter or is it
 fixed
 in latest versions?

 Thanks



 --
 View 

Re: Stress testing hdfs with Spark

2016-04-05 Thread Sebastian Piu
You could they using TestDFSIO for raw hdfs performance, but we found it
not very relevant

Another way could be to either generate a file and then read it and write
it back. For some of our use cases we are populated a Kafka queue on the
cluster (on different disks) and used spark streaming to do a simple
transformation and write back.

You can use graphite+grafana for the IO monitoring

On Tue, 5 Apr 2016, 20:56 Jan Holmberg,  wrote:

> I'm trying to get rough estimate how much data I can write within certain
> time period (GB/sec).
> -jan
>
> On 05 Apr 2016, at 22:49, Mich Talebzadeh 
> wrote:
>
> Hi Jan,
>
> What is the definition of stress test in here? What are the matrices?
> Throughput of data, latency, velocity, volume?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 20:42, Jan Holmberg  wrote:
>
>> Hi,
>> I'm trying to figure out how to write lots of data from each worker. I
>> tried rdd.saveAsTextFile but got OOM when generating 1024MB string for a
>> worker. Increasing worker memory would mean that I should drop the number
>> of workers.
>> Soo, any idea how to write ex. 1gb file from each worker?
>>
>> cheers,
>> -jan
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
For the in-memory cache, we still launch tasks, we just skip blocks when
possible using statistics about those blocks.

On Tue, Apr 5, 2016 at 12:14 PM, Darshan Singh 
wrote:

> Thanks. It is not my exact scenario but I have tried to reproduce it. I
> have used 1.5.2.
>
> I have a part-movies data-frame which has 20 partitions 1 each for a movie.
>
> I created following query
>
>
> val part_sql = sqlContext.sql("select * from part_movies where movie = 10")
> part_sql.count()
>
> I expect that this should just read from 1 partition i.e. partition 10.
> Other partitions it should max read metadata and not the data.
>
> here is physical plan. I could see the filter. From here i can not say
> whether this filter is causing any partition pruning. If actually pruning
> is happening i would like to see a operator which mentions the same.
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#75L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#93L])
>Project
> Filter (movie#33 = 10)
>  InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)], 
> (InMemoryRelation [movie#33,title#34,genres#35], true, 1, 
> StorageLevel(true, true, false, true, 1), (Scan 
> PhysicalRDD[movie#33,title#34,genres#35]), None)
>
>
> However, my assumption that partition is not pruned is not based on the
> above plan but when I look at the job and its stages. I could see that it
> has read full data of the dataframe.  I should see around 65KB as that is
> almost average size of each partition.
>
> Aggregated Metrics by Executor
> Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
> Size / Records Shuffle Write Size / Records
> driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20
>
>
> Task details only first 7. Here I expect that except 1 task(which access
> the partitions data) all others should be either 0 KB or just the size of
> metadata after which it discarded that partition as its data was not
> needed. But i could see that all the partitions are read.
>
> This is small example so it doesnt make diff but for a large dataframe
> reading all the data even that in memory takes time.
>
> Tasks
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
> ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
> 1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
> ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
> 2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
> ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
> 3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
> ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
> 4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
> ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
> 5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
> ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
> 6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
> ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
> 7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
> ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1
>
> Let me know if you need anything else.
>
> Thanks
>
>
>
>
> On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
> wrote:
>
>> Can you show your full code.  How are you partitioning the data? How are
>> you reading it?  What is the resulting query plan (run explain() or
>> EXPLAIN).
>>
>> On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:
>>
>>> HI,
>>>
>>> I am using 1.5.2. I have a dataframe which is partitioned based on the
>>> country. So I have around 150 partition in the dataframe. When I run
>>> sparksql and use country = 'UK' it still reads all partitions and not
>>> able
>>> to prune other partitions. Thus all the queries run for similar times
>>> independent of what country I pass. Is it desired?
>>>
>>> Is there a way to fix this in 1.5.2 by using some parameter or is it
>>> fixed
>>> in latest versions?
>>>
>>> Thanks
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Stress testing hdfs with Spark

2016-04-05 Thread Jan Holmberg
I'm trying to get rough estimate how much data I can write within certain time 
period (GB/sec).
-jan

On 05 Apr 2016, at 22:49, Mich Talebzadeh 
> wrote:

Hi Jan,

What is the definition of stress test in here? What are the matrices? 
Throughput of data, latency, velocity, volume?

HTH


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:42, Jan Holmberg 
> wrote:
Hi,
I'm trying to figure out how to write lots of data from each worker. I tried 
rdd.saveAsTextFile but got OOM when generating 1024MB string for a worker. 
Increasing worker memory would mean that I should drop the number of workers.
Soo, any idea how to write ex. 1gb file from each worker?

cheers,
-jan
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-05 Thread Mich Talebzadeh
Hi Ashok. I forgot to mention that test of mine was from Sybase ASE. So I
would say if the hash join start spilling to disk then the performance will
start degrading. My hunch is either the Optimizer does not cater for it or
Optimizer decides that it would be cheaper to use Nested Loop Join as
opposed to Hash Join.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 17:16, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Mich,
>
> Yes, Optimizer just chooses this on its own. The point of concern here is,
> this optimization does not work good in Large set Vs Small Set case. Nested
> Join is almost 10 times costlier than Hashed join or union join of 3
> conditions. So is this a bug on optimizer or is this a request to add SQL
> hints?
>
> Regards
> Ashok
>
> On Tue, Apr 5, 2016 at 1:33 AM, Mich Talebzadeh  > wrote:
>
>> Actually this may not be a bug. It just the Optimizer decides to do a
>> nested loop join over Hash Join when more that two OR joins are involved
>>
>> With one equality predicate Hash JOin is chosen
>>
>> 4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
>> 5> WHERE SALES.CUST_ID = SALES2.CUST_ID
>> 6> go
>> QUERY PLAN FOR STATEMENT 1 (at line 4).
>> Optimized using Parallel Mode
>>
>> STEP 1
>> The type of query is SELECT.
>> 4 operator(s) under root
>>|ROOT:EMIT Operator (VA = 4)
>>|
>>|   |SCALAR AGGREGATE Operator (VA = 3)
>>|   |  Evaluate Ungrouped COUNT AGGREGATE.
>>|   |
>>|   |   |*HASH JOIN Operator* (VA = 2) (Join Type: Inner Join)
>>|   |   | Using Worktable1 for internal storage.
>>|   |   |  Key Count: 1
>>|   |   |
>>|   |   |   |SCAN Operator (VA = 0)
>>|   |   |   |  FROM TABLE
>>|   |   |   |  SALES2
>>|   |   |   |  Table Scan.
>>|   |   |   |  Forward Scan.
>>|   |   |   |  Positioning at start of table.
>>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>>|   |   |   |  With LRU Buffer Replacement Strategy for data pages.
>>|   |   |
>>|   |   |   |SCAN Operator (VA = 1)
>>|   |   |   |  FROM TABLE
>>|   |   |   |  SALES
>>|   |   |   |  Table Scan.
>>|   |   |   |  Forward Scan.
>>|   |   |   |  Positioning at start of table.
>>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>>|   |   |   |  With MRU Buffer Replacement Strategy for data pages.
>>
>> Total estimated I/O cost for statement 1 (at line 4): 783206.
>>
>> Now if I chose two predicates it reverts to Nested Loop Join
>>
>> 4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
>> 5> WHERE SALES.CUST_ID = SALES2.CUST_ID
>> 6> OR
>> 7> SALES.TIME_ID = SALES2.TIME_ID
>> 8> go
>>
>> STEP 1
>> The type of query is SET OPTION ON.
>> Total estimated I/O cost for statement 3 (at line 3): 0.
>>
>> QUERY PLAN FOR STATEMENT 4 (at line 4).
>> Optimized using Parallel Mode
>>
>> STEP 1
>> The type of query is SELECT.
>> 5 operator(s) under root
>>|ROOT:EMIT Operator (VA = 5)
>>|
>>|   |SCALAR AGGREGATE Operator (VA = 4)
>>|   |  Evaluate Ungrouped COUNT AGGREGATE.
>>|   |
>>|   |   |*NESTED LOOP JOIN* Operator (VA = 3) (Join Type: Inner
>> Join)
>>|   |   |
>>|   |   |   |SCAN Operator (VA = 0)
>>|   |   |   |  FROM TABLE
>>|   |   |   |  SALES
>>|   |   |   |  Table Scan.
>>|   |   |   |  Forward Scan.
>>|   |   |   |  Positioning at start of table.
>>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>>|   |   |   |  With MRU Buffer Replacement Strategy for data pages.
>>|   |   |
>>|   |   |   |RESTRICT Operator (VA = 2)(0)(0)(0)(7)(0)
>>|   |   |   |
>>|   |   |   |   |SCAN Operator (VA = 1)
>>|   |   |   |   |  FROM TABLE
>>|   |   |   |   |  SALES2
>>|   |   |   |   |  Table Scan.
>>|   |   |   |   |  Forward Scan.
>>|   |   |   |   |  Positioning at start of table.
>>|   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
>>|   |   |   |   |  With LRU Buffer Replacement Strategy for data
>> pages.
>>
>> Total estimated I/O cost for statement 4 (at line 4): 2147483647.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 13:19, ashokkumar rajendran <
>> ashokkumar.rajend...@gmail.com> wrote:
>>
>>> I agree with Hemant's comment. 

Re: Stress testing hdfs with Spark

2016-04-05 Thread Mich Talebzadeh
Hi Jan,

What is the definition of stress test in here? What are the matrices?
Throughput of data, latency, velocity, volume?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 20:42, Jan Holmberg  wrote:

> Hi,
> I'm trying to figure out how to write lots of data from each worker. I
> tried rdd.saveAsTextFile but got OOM when generating 1024MB string for a
> worker. Increasing worker memory would mean that I should drop the number
> of workers.
> Soo, any idea how to write ex. 1gb file from each worker?
>
> cheers,
> -jan
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Stress testing hdfs with Spark

2016-04-05 Thread Jan Holmberg
Hi,
I'm trying to figure out how to write lots of data from each worker. I tried 
rdd.saveAsTextFile but got OOM when generating 1024MB string for a worker. 
Increasing worker memory would mean that I should drop the number of workers. 
Soo, any idea how to write ex. 1gb file from each worker?

cheers,
-jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to use custom properties in spark app

2016-04-05 Thread Haroon Rasheed
Hi,

You can have a custom properties file with Map like entries Key, Value
pairs "URL"-> "IPaddress:port/user/" etc and put this file on HDFS or
any location where Spark can access. Read the file as RDD as Map and read
the values in program.
You can also broadcast this in program if you need to access in all worker
nodes.

Regards,
Haroon Syed.

On 5 April 2016 at 07:15, yaoxiaohua  wrote:

> Hi bro,
>
> I am new in spark application develop, I need develop two
> app running on spark cluster.
>
> Now I have some arguments for the application.
>
> I can pass them as the program arguments when
> spark-submit, I want to find a new way.
>
> I have some arguments such as ,jdbc url, elastic search
> nodes , kafka group id,
>
> So I want to know whether there is a best practices to do
> this.
>
> Can I read this from one custom properties file?
>
>
>
> Another similar question is that, I will read some rules
> to analysis data,
>
> Now I store the rules in mysql,  before I use it ,I read
> it from mysql.
>
> Is there better way to do this?
>
> Thanks in advance.
>
>
>
> Best Regards,
>
> Evan Yao
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Darshan Singh
Thanks. It is not my exact scenario but I have tried to reproduce it. I
have used 1.5.2.

I have a part-movies data-frame which has 20 partitions 1 each for a movie.

I created following query


val part_sql = sqlContext.sql("select * from part_movies where movie = 10")
part_sql.count()

I expect that this should just read from 1 partition i.e. partition 10.
Other partitions it should max read metadata and not the data.

here is physical plan. I could see the filter. From here i can not say
whether this filter is causing any partition pruning. If actually pruning
is happening i would like to see a operator which mentions the same.

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)],
output=[count#75L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#93L])
   Project
Filter (movie#33 = 10)
 InMemoryColumnarTableScan [movie#33], [(movie#33 = 10)],
(InMemoryRelation [movie#33,title#34,genres#35], true, 1,
StorageLevel(true, true, false, true, 1), (Scan
PhysicalRDD[movie#33,title#34,genres#35]), None)


However, my assumption that partition is not pruned is not based on the
above plan but when I look at the job and its stages. I could see that it
has read full data of the dataframe.  I should see around 65KB as that is
almost average size of each partition.

Aggregated Metrics by Executor
Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Input
Size / Records Shuffle Write Size / Records
driver localhost:53247 0.4 s 20 0 20 1289.0 KB / 20 840.0 B / 20


Task details only first 7. Here I expect that except 1 task(which access
the partitions data) all others should be either 0 KB or just the size of
metadata after which it discarded that partition as its data was not
needed. But i could see that all the partitions are read.

This is small example so it doesnt make diff but for a large dataframe
reading all the data even that in memory takes time.

Tasks


















0 27 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 39 ms 12
ms 9 ms 0 ms 0 ms 0.0 B 66.2 KB (memory) / 1 42.0 B / 1
1 28 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 41 ms 9
ms 7 ms 0 ms 0 ms 0.0 B 63.9 KB (memory) / 1 1 ms 42.0 B / 1
2 29 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 40 ms 7
ms 7 ms 0 ms 0 ms 0.0 B 65.9 KB (memory) / 1 1 ms 42.0 B / 1
3 30 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 6 ms 3
ms 5 ms 0 ms 0 ms 0.0 B 62.0 KB (memory) / 1 42.0 B / 1
4 31 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 4
ms 6 ms 1 ms 0 ms 0.0 B 69.2 KB (memory) / 1 42.0 B / 1
5 32 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 2
ms 5 ms 0 ms 0 ms 0.0 B 60.3 KB (memory) / 1 42.0 B / 1
6 33 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 5 ms 3
ms 4 ms 0 ms 0 ms 0.0 B 70.3 KB (memory) / 1 42.0 B / 1
7 34 0 SUCCESS PROCESS_LOCAL driver / localhost 2016/04/05 19:01:03 4 ms 5
ms 4 ms 0 ms 0 ms 0.0 B 59.7 KB (memory) / 1 42.0 B / 1

Let me know if you need anything else.

Thanks




On Tue, Apr 5, 2016 at 7:29 PM, Michael Armbrust 
wrote:

> Can you show your full code.  How are you partitioning the data? How are
> you reading it?  What is the resulting query plan (run explain() or
> EXPLAIN).
>
> On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:
>
>> HI,
>>
>> I am using 1.5.2. I have a dataframe which is partitioned based on the
>> country. So I have around 150 partition in the dataframe. When I run
>> sparksql and use country = 'UK' it still reads all partitions and not able
>> to prune other partitions. Thus all the queries run for similar times
>> independent of what country I pass. Is it desired?
>>
>> Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
>> in latest versions?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Shivaram Venkataraman
Overall this sounds good to me. One question I have is that in
addition to the ML algorithms we have a number of linear algebra
(various distributed matrices) and statistical methods in the
spark.mllib package. Is the plan to port or move these to the spark.ml
namespace in the 2.x series ?

Thanks
Shivaram

On Tue, Apr 5, 2016 at 11:48 AM, Sean Owen  wrote:
> FWIW, all of that sounds like a good plan to me. Developing one API is
> certainly better than two.
>
> On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
>> Hi all,
>>
>> More than a year ago, in Spark 1.2 we introduced the ML pipeline API built
>> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based API has
>> been developed under the spark.ml package, while the old RDD-based API has
>> been developed in parallel under the spark.mllib package. While it was
>> easier to implement and experiment with new APIs under a new package, it
>> became harder and harder to maintain as both packages grew bigger and
>> bigger. And new users are often confused by having two sets of APIs with
>> overlapped functions.
>>
>> We started to recommend the DataFrame-based API over the RDD-based API in
>> Spark 1.5 for its versatility and flexibility, and we saw the development
>> and the usage gradually shifting to the DataFrame-based API. Just counting
>> the lines of Scala code, from 1.5 to the current master we added ~1
>> lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
>> gather more resources on the development of the DataFrame-based API and to
>> help users migrate over sooner, I want to propose switching RDD-based MLlib
>> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>>
>> * We do not accept new features in the RDD-based spark.mllib package, unless
>> they block implementing new features in the DataFrame-based spark.ml
>> package.
>> * We still accept bug fixes in the RDD-based API.
>> * We will add more features to the DataFrame-based API in the 2.x series to
>> reach feature parity with the RDD-based API.
>> * Once we reach feature parity (possibly in Spark 2.2), we will deprecate
>> the RDD-based API.
>> * We will remove the RDD-based API from the main Spark repo in Spark 3.0.
>>
>> Though the RDD-based API is already in de facto maintenance mode, this
>> announcement will make it clear and hence important to both MLlib developers
>> and users. So we’d greatly appreciate your feedback!
>>
>> (As a side note, people sometimes use “Spark ML” to refer to the
>> DataFrame-based API or even the entire MLlib component. This also causes
>> confusion. To be clear, “Spark ML” is not an official name and there are no
>> plans to rename MLlib to “Spark ML” at this time.)
>>
>> Best,
>> Xiangrui
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Sean Owen
FWIW, all of that sounds like a good plan to me. Developing one API is
certainly better than two.

On Tue, Apr 5, 2016 at 7:01 PM, Xiangrui Meng  wrote:
> Hi all,
>
> More than a year ago, in Spark 1.2 we introduced the ML pipeline API built
> on top of Spark SQL’s DataFrames. Since then the new DataFrame-based API has
> been developed under the spark.ml package, while the old RDD-based API has
> been developed in parallel under the spark.mllib package. While it was
> easier to implement and experiment with new APIs under a new package, it
> became harder and harder to maintain as both packages grew bigger and
> bigger. And new users are often confused by having two sets of APIs with
> overlapped functions.
>
> We started to recommend the DataFrame-based API over the RDD-based API in
> Spark 1.5 for its versatility and flexibility, and we saw the development
> and the usage gradually shifting to the DataFrame-based API. Just counting
> the lines of Scala code, from 1.5 to the current master we added ~1
> lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
> gather more resources on the development of the DataFrame-based API and to
> help users migrate over sooner, I want to propose switching RDD-based MLlib
> APIs to maintenance mode in Spark 2.0. What does it mean exactly?
>
> * We do not accept new features in the RDD-based spark.mllib package, unless
> they block implementing new features in the DataFrame-based spark.ml
> package.
> * We still accept bug fixes in the RDD-based API.
> * We will add more features to the DataFrame-based API in the 2.x series to
> reach feature parity with the RDD-based API.
> * Once we reach feature parity (possibly in Spark 2.2), we will deprecate
> the RDD-based API.
> * We will remove the RDD-based API from the main Spark repo in Spark 3.0.
>
> Though the RDD-based API is already in de facto maintenance mode, this
> announcement will make it clear and hence important to both MLlib developers
> and users. So we’d greatly appreciate your feedback!
>
> (As a side note, people sometimes use “Spark ML” to refer to the
> DataFrame-based API or even the entire MLlib component. This also causes
> confusion. To be clear, “Spark ML” is not an official name and there are no
> plans to rename MLlib to “Spark ML” at this time.)
>
> Best,
> Xiangrui

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can spark somehow help with this usecase?

2016-04-05 Thread Marco Mistroni
Many thanks for suggestion Andy!
Kr
Marco
On 5 Apr 2016 7:25 pm, "Andy Davidson" 
wrote:

> Hi Marco
>
> You might consider setting up some sort of ELT pipe line. One of your
> stages might be to create a file of all the FTP URL.  You could then write
> a spark app that just fetches the urls and stores the data in some sort of
> data base or on the file system (hdfs?)
>
> My guess would be to maybe use the map() transform to make the FTP call.
> If you are using Java or scala take a look at the apache commons FTP Client
>
> I assume that each ftp get is independent. *Maybe some one know more
> about how to control the amount of concurrency*. I think it will be based
> on the number of partitions, works, and cores?
>
> Andy
>
> From: Marco Mistroni 
> Date: Tuesday, April 5, 2016 at 9:13 AM
> To: "user @spark" 
> Subject: Can spark somehow help with this usecase?
>
> Hi
> I m currently using spark to process a file containing a million of
> rows(edgar quarterly filings files)
> Each row contains some infos plus a location of a remote file which I need
> to retrieve using FTP and then process it's content.
> I want to do all 3 operations ( process filing file, fetch remote files
> and process them in ) in one go.
> I want to avoid doing the first step (processing the million row file) in
> spark and the rest (_fetching FTP and process files) offline.
> Does spark has anything that can help with the FTP fetch?
>
> Thanks in advance and rgds
> Marco
>
>


Re: dataframe sorting and find the index of the maximum element

2016-04-05 Thread Michael Armbrust
You should generally think of a DataFrame as unordered, unless you are
explicitly asking for an order.  One way to order and assign an index is
with window functions

.

On Tue, Apr 5, 2016 at 4:17 AM, Angel Angel  wrote:

> Hello,
>
> i am writing one spark application i which i need the index of the maximum
> element.
>
> My table has one column only and i want the index of the maximum element.
>
> MAX(count)
> 23
> 32
> 3
> Here is my code the data type of the array is
> org.apache.spark.sql.Dataframe.
>
>
> Thanks in advance.
> Also please suggest me to do it in another way.
>
> [image: Inline image 1]
>


Re: Partition pruning in spark 1.5.2

2016-04-05 Thread Michael Armbrust
Can you show your full code.  How are you partitioning the data? How are
you reading it?  What is the resulting query plan (run explain() or
EXPLAIN).

On Tue, Apr 5, 2016 at 10:02 AM, dsing001  wrote:

> HI,
>
> I am using 1.5.2. I have a dataframe which is partitioned based on the
> country. So I have around 150 partition in the dataframe. When I run
> sparksql and use country = 'UK' it still reads all partitions and not able
> to prune other partitions. Thus all the queries run for similar times
> independent of what country I pass. Is it desired?
>
> Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
> in latest versions?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Can spark somehow help with this usecase?

2016-04-05 Thread Andy Davidson
Hi Marco

You might consider setting up some sort of ELT pipe line. One of your stages
might be to create a file of all the FTP URL.  You could then write a spark
app that just fetches the urls and stores the data in some sort of data base
or on the file system (hdfs?)

My guess would be to maybe use the map() transform to make the FTP call. If
you are using Java or scala take a look at the apache commons FTP Client

I assume that each ftp get is independent. Maybe some one know more about
how to control the amount of concurrency. I think it will be based on the
number of partitions, works, and cores?

Andy

From:  Marco Mistroni 
Date:  Tuesday, April 5, 2016 at 9:13 AM
To:  "user @spark" 
Subject:  Can spark somehow help with this usecase?

> 
> Hi 
>  I m currently using spark to process a file containing a million of
> rows(edgar quarterly filings files)
> Each row contains some infos plus a location of a remote file which I need to
> retrieve using FTP and then process it's content.
> I want to do all 3 operations ( process filing file, fetch remote files and
> process them in ) in one go.
> I want to avoid doing the first step (processing the million row file) in
> spark and the rest (_fetching FTP and process files) offline.
> Does spark has anything that can help with the FTP fetch?
> 
> Thanks in advance and rgds
>  Marco




Switch RDD-based MLlib APIs to maintenance mode in Spark 2.0

2016-04-05 Thread Xiangrui Meng
Hi all,

More than a year ago, in Spark 1.2 we introduced the ML pipeline API built
on top of Spark SQL’s DataFrames. Since then the new DataFrame-based API
has been developed under the spark.ml package, while the old RDD-based API
has been developed in parallel under the spark.mllib package. While it was
easier to implement and experiment with new APIs under a new package, it
became harder and harder to maintain as both packages grew bigger and
bigger. And new users are often confused by having two sets of APIs with
overlapped functions.

We started to recommend the DataFrame-based API over the RDD-based API in
Spark 1.5 for its versatility and flexibility, and we saw the development
and the usage gradually shifting to the DataFrame-based API. Just counting
the lines of Scala code, from 1.5 to the current master we added ~1
lines to the DataFrame-based API while ~700 to the RDD-based API. So, to
gather more resources on the development of the DataFrame-based API and to
help users migrate over sooner, I want to propose switching RDD-based MLlib
APIs to maintenance mode in Spark 2.0. What does it mean exactly?

* We do not accept new features in the RDD-based spark.mllib package,
unless they block implementing new features in the DataFrame-based spark.ml
package.
* We still accept bug fixes in the RDD-based API.
* We will add more features to the DataFrame-based API in the 2.x series to
reach feature parity with the RDD-based API.
* Once we reach feature parity (possibly in Spark 2.2), we will deprecate
the RDD-based API.
* We will remove the RDD-based API from the main Spark repo in Spark 3.0.

Though the RDD-based API is already in de facto maintenance mode, this
announcement will make it clear and hence important to both MLlib
developers and users. So we’d greatly appreciate your feedback!

(As a side note, people sometimes use “Spark ML” to refer to the
DataFrame-based API or even the entire MLlib component. This also causes
confusion. To be clear, “Spark ML” is not an official name and there are no
plans to rename MLlib to “Spark ML” at this time.)

Best,
Xiangrui


RE: Plan issue with spark 1.5.2

2016-04-05 Thread Yong Zhang
You need to show us the execution plan, so we can understand what is your issue.
Use the spark shell code to show how your DF is built, how you partition them, 
then use explain(true) on your join DF, and show the output here, so we can 
better help you.
Yong

> Date: Tue, 5 Apr 2016 09:46:59 -0700
> From: darshan.m...@gmail.com
> To: user@spark.apache.org
> Subject: Plan issue with spark 1.5.2
> 
> 
> I am using spark 1.5.2. I have a question regarding plan generated by spark.
> I have 3 data-frames which has the data for different countries. I have
> around 150 countries and data is skewed.
> 
> My 95% queries will have country as criteria. However, I have seen issues
> with the plans generated for queries which has country as join column.
> 
> Data-frames are partitioned based on the country.Not only these dataframes
> are co-partitioned, these are co-located as well. E.g. Data for UK in
> data-frame df1, df2 df3 will be at on same hdfs datanode. 
> 
> Then when i join these 3 tables and country is one of the join column. I
> assume that the join should be the map side join but it shuffles the data
> from 3 dataframes and then join using shuffled data. Apart from country
> there are other columns in join.
> 
> Is this correct behavior? If it is an issue is it fixed in latest versions?
> 
> Thanks
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: dataframe sorting and find the index of the maximum element

2016-04-05 Thread Ted Yu
The error was due to REPL expecting an integer (index to the Array) whereas
"MAX(count)" was a String.

What do you want to achieve ?

On Tue, Apr 5, 2016 at 4:17 AM, Angel Angel  wrote:

> Hello,
>
> i am writing one spark application i which i need the index of the maximum
> element.
>
> My table has one column only and i want the index of the maximum element.
>
> MAX(count)
> 23
> 32
> 3
> Here is my code the data type of the array is
> org.apache.spark.sql.Dataframe.
>
>
> Thanks in advance.
> Also please suggest me to do it in another way.
>
> [image: Inline image 1]
>


Partition pruning in spark 1.5.2

2016-04-05 Thread dsing001
HI,

I am using 1.5.2. I have a dataframe which is partitioned based on the
country. So I have around 150 partition in the dataframe. When I run
sparksql and use country = 'UK' it still reads all partitions and not able
to prune other partitions. Thus all the queries run for similar times
independent of what country I pass. Is it desired?

Is there a way to fix this in 1.5.2 by using some parameter or is it fixed
in latest versions?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-pruning-in-spark-1-5-2-tp26682.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Facing Unusual Behavior with the executors in spark streaming

2016-04-05 Thread Abhishek Anand
Hi ,

Needed inputs for a couple of issue that I am facing in my production
environment.

I am using spark version 1.4.0 spark streaming.

1) It so happens that the worker is lost on a machine and the executor
still shows up in the executor's tab in the UI.

Even when I kill a worker using kill -9 command the worker and executor
both dies on that machine but executor still shows up in the executors tab
on the UI. The number of active tasks sometimes shows negative on that
executor and my job keeps failing with following exception.

This usually happens when a job is running. When no computation is taking
place on the cluster i.e suppose a 1 min batch gets completed in 20 secs
and I kill the worker then executor entry is also gone from the UI but when
I kill the worker when a job is still running I run into this issue always.


16/04/01 23:54:20 WARN TaskSetManager: Lost task 141.0 in stage 19859.0
(TID 190333, 192.168.33.96): java.io.IOException: Failed to connect to /
192.168.33.97:63276
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.33.97:63276
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more



 When I relaunch the worker new executors are added but the dead one's
entry is still there until the application is killed.

 2) Another issue is when the disk becomes full on one of the workers, the
executor becomes unresponsive and the job stucks at a particular stage. The
exception that I can see in the executor logs is


 16/03/29 10:49:00 ERROR DiskBlockObjectWriter: Uncaught exception while
reverting partial writes to file
/data/spark-e2fc248f-a212-4a99-9d6c-4e52d6a69070/executor-37679a6c-cb96-451e-a284-64d6b4fe9910/blockmgr-f8ca72f4-f329-468b-8e65-ef97f8fb285c/38/temp_shuffle_8f266d70-3fc6-41e5-bbaa-c413a7b08ea4
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:315)
at
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at org.xerial.snappy.SnappyOutputStream.flush(SnappyOutputStream.java:274)


As a workaround I have to kill the executor, clear the space on disk and
new executor  relaunched by the worker and the failed stages are
recomputed. But, is it really the case that when the space is full on a
machine then my application gets stuck ?




This is really becoming a bottleneck and leads to unstability of my
production stack.

Please share your insights on this.


Thanks,
Abhi


Plan issue with spark 1.5.2

2016-04-05 Thread dsing001

I am using spark 1.5.2. I have a question regarding plan generated by spark.
I have 3 data-frames which has the data for different countries. I have
around 150 countries and data is skewed.

My 95% queries will have country as criteria. However, I have seen issues
with the plans generated for queries which has country as join column.

Data-frames are partitioned based on the country.Not only these dataframes
are co-partitioned, these are co-located as well. E.g. Data for UK in
data-frame df1, df2 df3 will be at on same hdfs datanode. 

Then when i join these 3 tables and country is one of the join column. I
assume that the join should be the map side join but it shuffles the data
from 3 dataframes and then join using shuffled data. Apart from country
there are other columns in join.

Is this correct behavior? If it is an issue is it fixed in latest versions?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Plan-issue-with-spark-1-5-2-tp26681.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: --packages configuration equivalent item name?

2016-04-05 Thread Russell Jurney
Thanks! These aren't in the docs, I will make a JIRA to add them.

On Monday, April 4, 2016, Saisai Shao  wrote:

> spark.jars.ivy, spark.jars.packages, spark.jars.excludes is the
> configurations you can use.
>
> Thanks
> Saisai
>
> On Sun, Apr 3, 2016 at 1:59 AM, Russell Jurney  > wrote:
>
>> Thanks, Andy!
>>
>> On Mon, Mar 28, 2016 at 8:44 AM, Andy Davidson <
>> a...@santacruzintegration.com
>> > wrote:
>>
>>> Hi Russell
>>>
>>> I use Jupyter python notebooks a lot. Here is how I start the server
>>>
>>> set -x # turn debugging on
>>>
>>> #set +x # turn debugging off
>>>
>>>
>>> # https://github.com/databricks/spark-csv
>>>
>>> # http://spark-packages.org/package/datastax/spark-cassandra-connector
>>>
>>> #
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md
>>>
>>> #
>>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md#pyspark-with-data-frames
>>>
>>>
>>> # packages are ',' seperate with no white space
>>>
>>> extraPkgs="--packages
>>> com.databricks:spark-csv_2.11:1.3.0,datastax:spark-cassandra-connector:1.6.0-M1-s_2.10"
>>>
>>>
>>> export PYSPARK_PYTHON=python3
>>>
>>> export PYSPARK_DRIVER_PYTHON=python3
>>>
>>> IPYTHON_OPTS=notebook $SPARK_ROOT/bin/pyspark $extraPkgs --conf
>>> spark.cassandra.connection.host=
>>> ec2-54-153-102-232.us-west-1.compute.amazonaws.com $*
>>>
>>>
>>>
>>> From: Russell Jurney >> >
>>> Date: Sunday, March 27, 2016 at 7:22 PM
>>> To: "user @spark" >> >
>>> Subject: --packages configuration equivalent item name?
>>>
>>> I run PySpark with CSV support like so: IPYTHON=1 pyspark --packages
>>> com.databricks:spark-csv_2.10:1.4.0
>>>
>>> I don't want to type this --packages argument each time. Is there a
>>> config item for --packages? I can't find one in the reference at
>>> http://spark.apache.org/docs/latest/configuration.html
>>>
>>> If there is no way to do this, please let me know so I can make a JIRA
>>> for this feature.
>>>
>>> Thanks!
>>> --
>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com
>>>  relato.io
>>>
>>>
>>
>>
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com
>>  relato.io
>>
>
>

-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-05 Thread ashokkumar rajendran
Hi Mich,

Yes, Optimizer just chooses this on its own. The point of concern here is,
this optimization does not work good in Large set Vs Small Set case. Nested
Join is almost 10 times costlier than Hashed join or union join of 3
conditions. So is this a bug on optimizer or is this a request to add SQL
hints?

Regards
Ashok

On Tue, Apr 5, 2016 at 1:33 AM, Mich Talebzadeh 
wrote:

> Actually this may not be a bug. It just the Optimizer decides to do a
> nested loop join over Hash Join when more that two OR joins are involved
>
> With one equality predicate Hash JOin is chosen
>
> 4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
> 5> WHERE SALES.CUST_ID = SALES2.CUST_ID
> 6> go
> QUERY PLAN FOR STATEMENT 1 (at line 4).
> Optimized using Parallel Mode
>
> STEP 1
> The type of query is SELECT.
> 4 operator(s) under root
>|ROOT:EMIT Operator (VA = 4)
>|
>|   |SCALAR AGGREGATE Operator (VA = 3)
>|   |  Evaluate Ungrouped COUNT AGGREGATE.
>|   |
>|   |   |*HASH JOIN Operator* (VA = 2) (Join Type: Inner Join)
>|   |   | Using Worktable1 for internal storage.
>|   |   |  Key Count: 1
>|   |   |
>|   |   |   |SCAN Operator (VA = 0)
>|   |   |   |  FROM TABLE
>|   |   |   |  SALES2
>|   |   |   |  Table Scan.
>|   |   |   |  Forward Scan.
>|   |   |   |  Positioning at start of table.
>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>|   |   |   |  With LRU Buffer Replacement Strategy for data pages.
>|   |   |
>|   |   |   |SCAN Operator (VA = 1)
>|   |   |   |  FROM TABLE
>|   |   |   |  SALES
>|   |   |   |  Table Scan.
>|   |   |   |  Forward Scan.
>|   |   |   |  Positioning at start of table.
>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>|   |   |   |  With MRU Buffer Replacement Strategy for data pages.
>
> Total estimated I/O cost for statement 1 (at line 4): 783206.
>
> Now if I chose two predicates it reverts to Nested Loop Join
>
> 4> SELECT COUNT(SALES.PROD_ID) from SALES, SALES2
> 5> WHERE SALES.CUST_ID = SALES2.CUST_ID
> 6> OR
> 7> SALES.TIME_ID = SALES2.TIME_ID
> 8> go
>
> STEP 1
> The type of query is SET OPTION ON.
> Total estimated I/O cost for statement 3 (at line 3): 0.
>
> QUERY PLAN FOR STATEMENT 4 (at line 4).
> Optimized using Parallel Mode
>
> STEP 1
> The type of query is SELECT.
> 5 operator(s) under root
>|ROOT:EMIT Operator (VA = 5)
>|
>|   |SCALAR AGGREGATE Operator (VA = 4)
>|   |  Evaluate Ungrouped COUNT AGGREGATE.
>|   |
>|   |   |*NESTED LOOP JOIN* Operator (VA = 3) (Join Type: Inner
> Join)
>|   |   |
>|   |   |   |SCAN Operator (VA = 0)
>|   |   |   |  FROM TABLE
>|   |   |   |  SALES
>|   |   |   |  Table Scan.
>|   |   |   |  Forward Scan.
>|   |   |   |  Positioning at start of table.
>|   |   |   |  Using I/O Size 64 Kbytes for data pages.
>|   |   |   |  With MRU Buffer Replacement Strategy for data pages.
>|   |   |
>|   |   |   |RESTRICT Operator (VA = 2)(0)(0)(0)(7)(0)
>|   |   |   |
>|   |   |   |   |SCAN Operator (VA = 1)
>|   |   |   |   |  FROM TABLE
>|   |   |   |   |  SALES2
>|   |   |   |   |  Table Scan.
>|   |   |   |   |  Forward Scan.
>|   |   |   |   |  Positioning at start of table.
>|   |   |   |   |  Using I/O Size 64 Kbytes for data pages.
>|   |   |   |   |  With LRU Buffer Replacement Strategy for data
> pages.
>
> Total estimated I/O cost for statement 4 (at line 4): 2147483647.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 1 April 2016 at 13:19, ashokkumar rajendran <
> ashokkumar.rajend...@gmail.com> wrote:
>
>> I agree with Hemant's comment. But it does not give good results for
>> simple usecases like 2 OR conditions. Ultimately we need good results from
>> Spark for end users. shall we consider this as a request to support SQL
>> hints then? Is there any plan to support SQL hint in Spark in upcoming
>> release?
>>
>> Regards
>> Ashok
>>
>> On Fri, Apr 1, 2016 at 5:04 PM, Robin East 
>> wrote:
>>
>>> Yes and even today CBO (e.g. in Oracle) will still require hints in some
>>> cases so I think it is more like:
>>>
>>> RBO -> RBO + Hints -> CBO + Hints. Most relational databases meet
>>> significant numbers of corner cases where CBO plans simply don’t do what
>>> you would want. I don’t know enough about Spark SQL to comment on whether
>>> the same problems would afflict Spark.
>>>
>>>
>>>
>>>

Can spark somehow help with this usecase?

2016-04-05 Thread Marco Mistroni
Hi
I m currently using spark to process a file containing a million of
rows(edgar quarterly filings files)
Each row contains some infos plus a location of a remote file which I need
to retrieve using FTP and then process it's content.
I want to do all 3 operations ( process filing file, fetch remote files and
process them in ) in one go.
I want to avoid doing the first step (processing the million row file) in
spark and the rest (_fetching FTP and process files) offline.
Does spark has anything that can help with the FTP fetch?

Thanks in advance and rgds
Marco


Re: RDD Partitions not distributed evenly to executors

2016-04-05 Thread Khaled Ammar
I have a similar experience.

Using 32 machines, I can see than number of tasks (partitions) assigned to
executors (machines) is not even. Moreover, the distribution change every
stage (iteration).

I wonder why Spark needs to move partitions around any way, should not the
scheduler reduce network (and other IO) overhead by reducing such
relocation.

Thanks,
-Khaled




On Mon, Apr 4, 2016 at 10:57 PM, Koert Kuipers  wrote:

> can you try:
> spark.shuffle.reduceLocality.enabled=false
>
> On Mon, Apr 4, 2016 at 8:17 PM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Dear all,
>>
>> Thank you for your responses.
>>
>> Michael Slavitch:
>> > Just to be sure:  Has spark-env.sh and spark-defaults.conf been
>> correctly propagated to all nodes?  Are they identical?
>> Yes; these files are stored on a shared memory directory accessible to
>> all nodes.
>>
>> Koert Kuipers:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> I reran the exact same code with a restarted cluster using this
>> modification, and did not observe any difference. The partitioning is
>> still imbalanced.
>>
>> Ted Yu:
>> > If the changes can be ported over to 1.6.1, do you mind reproducing the
>> issue there ?
>> Since the spark.memory.useLegacyMode setting did not impact my code
>> execution, I will have to change the Spark dependency back to earlier
>> versions to see if the issue persists and get back to you.
>>
>> Meanwhile, if anyone else has any other ideas or experience, please let
>> me know.
>>
>> Mike
>>
>> On 4/4/16, Koert Kuipers  wrote:
>> > we ran into similar issues and it seems related to the new memory
>> > management. can you try:
>> > spark.memory.useLegacyMode = true
>> >
>> > On Mon, Apr 4, 2016 at 9:12 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> [ CC'ing dev list since nearly identical questions have occurred in
>> >> user list recently w/o resolution;
>> >> c.f.:
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html
>> >>
>> >>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html
>> >> ]
>> >>
>> >> Hello,
>> >>
>> >> In short, I'm reporting a problem concerning load imbalance of RDD
>> >> partitions across a standalone cluster. Though there are 16 cores
>> >> available per node, certain nodes will have >16 partitions, and some
>> >> will correspondingly have <16 (and even 0).
>> >>
>> >> In more detail: I am running some scalability/performance tests for
>> >> vector-type operations. The RDDs I'm considering are simple block
>> >> vectors of type RDD[(Int,Vector)] for a Breeze vector type. The RDDs
>> >> are generated with a fixed number of elements given by some multiple
>> >> of the available cores, and subsequently hash-partitioned by their
>> >> integer block index.
>> >>
>> >> I have verified that the hash partitioning key distribution, as well
>> >> as the keys themselves, are both correct; the problem is truly that
>> >> the partitions are *not* evenly distributed across the nodes.
>> >>
>> >> For instance, here is a representative output for some stages and
>> >> tasks in an iterative program. This is a very simple test with 2
>> >> nodes, 64 partitions, 32 cores (16 per node), and 2 executors. Two
>> >> examples stages from the stderr log are stages 7 and 9:
>> >> 7,mapPartitions at DummyVector.scala:113,64,1459771364404,1459771365272
>> >> 9,mapPartitions at DummyVector.scala:113,64,1459771364431,1459771365639
>> >>
>> >> When counting the location of the partitions on the compute nodes from
>> >> the stderr logs, however, you can clearly see the imbalance. Examples
>> >> lines are:
>> >> 13627 task 0.0 in stage 7.0 (TID 196,
>> >> himrod-2, partition 0,PROCESS_LOCAL, 3987 bytes)&
>> >> 13628 task 1.0 in stage 7.0 (TID 197,
>> >> himrod-2, partition 1,PROCESS_LOCAL, 3987 bytes)&
>> >> 13629 task 2.0 in stage 7.0 (TID 198,
>> >> himrod-2, partition 2,PROCESS_LOCAL, 3987 bytes)&
>> >>
>> >> Grep'ing the full set of above lines for each hostname, himrod-?,
>> >> shows the problem occurs in each stage. Below is the output, where the
>> >> number of partitions stored on each node is given alongside its
>> >> hostname as in (himrod-?,num_partitions):
>> >> Stage 7: (himrod-1,0) (himrod-2,64)
>> >> Stage 9: (himrod-1,16) (himrod-2,48)
>> >> Stage 12: (himrod-1,0) (himrod-2,64)
>> >> Stage 14: (himrod-1,16) (himrod-2,48)
>> >> The imbalance is also visible when the executor ID is used to count
>> >> the partitions operated on by executors.
>> >>
>> >> I am working off a fairly recent modification of 2.0.0-SNAPSHOT branch
>> >> (but the modifications do not touch the scheduler, and are irrelevant
>> >> for these particular tests). Has something changed radically in 1.6+
>> >> that would make a previously (<=1.5) correct configuration go haywire?
>> >> Have new 

Question around spark on EMR

2016-04-05 Thread Natu Lauchande
Hi,

I am setting up a Scala spark streaming app in EMR . I wonder if anyone in
the list can help me with the following question :

1. What's the approach that you guys have been using  to submit in an EMR
job step environment variables that will be needed by the Spark application
?

2. Can i have multiple streamming apps in EMR ?

3. Is there any tool recommended for configuration management ( something
like Consult)


Thanks,
Natu


GraphX replication factor

2016-04-05 Thread Khaled Ammar
Hi,

I wonder if it is possible to figure out the replication factor used in
GraphX partitioning from its log files.

-- 
Thanks,
-Khaled


Re: dataframe sorting and find the index of the maximum element

2016-04-05 Thread Ted Yu
Did you define idxmax() method yourself ?

Thanks

On Tue, Apr 5, 2016 at 4:17 AM, Angel Angel  wrote:

> Hello,
>
> i am writing one spark application i which i need the index of the maximum
> element.
>
> My table has one column only and i want the index of the maximum element.
>
> MAX(count)
> 23
> 32
> 3
> Here is my code the data type of the array is
> org.apache.spark.sql.Dataframe.
>
>
> Thanks in advance.
> Also please suggest me to do it in another way.
>
> [image: Inline image 1]
>


HiveContext unable to recognize the delimiter of Hive table in textfile partitioned by date

2016-04-05 Thread Shiva Achari
Hi,

I have created a hive external table stored as textfile partitioned by
event_date Date.

How do we have to specify a specific format of csv while reading in spark
from Hive table ?

The environment is

 1. 1.Spark 1.5.0 - cdh5.5.1 Using Scala version 2.10.4(Java
HotSpot(TM) 64 - Bit Server VM, Java 1.7.0_67)
 2. Hive 1.1, CDH 5.5.1

scala script

sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

val distData = sc.parallelize(Array((1, 1, 1), (2, 2, 2), (3, 3,
3))).toDF
val distData_1 = distData.withColumn("event_date", current_date())
distData_1: org.apache.spark.sql.DataFrame = [_1: int, _2: int, _3:
int, event_date: date]

scala > distData_1.show
+ ---+---+---+--+
|_1 |_2 |_3 | event_date |
| 1 | 1 | 1 | 2016-03-25 |
| 2 | 2 | 2 | 2016-03-25 |
| 3 | 3 | 3 | 2016-03-25 |


distData_1.write.mode("append").partitionBy("event_date").saveAsTable("part_table")


scala > sqlContext.sql("select * from part_table").show
| a| b| c| event_date |
|1,1,1 | null | null | 2016-03-25 |
|2,2,2 | null | null | 2016-03-25 |
|3,3,3 | null | null | 2016-03-25 |



Hive table

create external table part_table (a String, b int, c bigint)
partitioned by (event_date Date)
row format delimited fields terminated by ','
stored as textfile  LOCATION "/user/hdfs/hive/part_table";

select * from part_table shows
|part_table.a | part_table.b | part_table.c | part_table.event_date
|
|1 |1 |1
 |2016-03-25
|2 |2 |2
 |2016-03-25
|3 |3 |3
 |2016-03-25


Looking at the hdfs


The path has 2 part files
/user/hdfs/hive/part_table/event_date=2016-03-25
part-0
part-1

  part-0 content
1,1,1
  part-1 content
2,2,2
3,3,3


P.S. if we store the table as orc it writes and reads the data as expected.


Re: Spark Streaming - NotSerializableException: Methods & Closures:

2016-04-05 Thread Mayur Pawashe
Hi. I am using 2.10.4 for Scala. 1.6.0 for Spark related dependencies. I am 
also using spark-streaming-kafka and including kafka (0.8.1.1) which apparently 
is needed for deserializers. 

> On Apr 4, 2016, at 6:18 PM, Ted Yu  wrote:
> 
> bq. I'm on version 2.10 for spark 
> 
> The above is Scala version.
> Can you give us the Spark version ?
> 
> Thanks
> 
>> On Mon, Apr 4, 2016 at 2:36 PM, mpawashe  wrote:
>> Hi all,
>> 
>> I am using Spark Streaming API (I'm on version 2.10 for spark and
>> streaming), and I am running into a function serialization issue that I do
>> not run into when using Spark in batch (non-streaming) mode.
>> 
>> If I wrote code like this:
>> 
>> def run(): Unit = {
>> val newStream = stream.map(x => { x + " foo " })
>> // ...
>> }
>> 
>> everything works fine.. But if I try it like this:
>> 
>> def transform(x: String): String = { x + " foo " }
>> 
>> def run(): Unit = {
>> val newStream = stream.map(transform)
>> // ...
>> }
>> 
>> ..the program fails being unable to serialize the closure (which when
>> passing a method to a function that expects a closure, it should be
>> auto-converted to my understanding).
>> 
>> However it works fine if I declare a closure inside run() and use that like
>> so:
>> 
>> val transform = (x: String) => { x + " foo " }
>> 
>> If it's declared outside of run(), however, it will also crash.
>> 
>> This is an example stack trace of the error I'm running into. This can be a
>> hassle to debug so I hope I wouldn't have to get around this by having to
>> use a local closure/function every time. Thanks for any help in advance.
>> 
>> org.apache.spark.SparkException: Task not serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at 
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2030)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
>> at
>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:266)
>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
>> at com.my.cool.app.MyClass.run(MyClass.scala:90)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>> at 
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: java.io.NotSerializableException: Graph is unexpectedly null when
>> DStream is being serialized.
>> Serialization stack:
>> 
>> at
>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>> at
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>> ... 20 more
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-NotSerializableException-Methods-Closures-tp26672.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


how to use custom properties in spark app

2016-04-05 Thread yaoxiaohua
Hi bro,

I am new in spark application develop, I need develop two
app running on spark cluster.

Now I have some arguments for the application.

I can pass them as the program arguments when spark-submit,
I want to find a new way.

I have some arguments such as ,jdbc url, elastic search
nodes , kafka group id,

So I want to know whether there is a best practices to do
this. 

Can I read this from one custom properties file?



Another similar question is that, I will read some rules to
analysis data,

Now I store the rules in mysql,  before I use it ,I read it
from mysql.

Is there better way to do this?

Thanks in advance.

 

Best Regards,

Evan Yao



Re: Could not load shims in class org.apache.hadoop.hive.schshim.FairSchedulerShim

2016-04-05 Thread ram kumar
I am facing this same issue.
Can any1 help me with this

Thanks

On Mon, Dec 7, 2015 at 9:14 AM, Shige Song  wrote:

> Hard to tell.
>
> On Mon, Dec 7, 2015 at 11:35 AM, zhangjp <592426...@qq.com> wrote:
>
>> Hi all,
>>
>>  I'm using saprk prebuild version 1.5.2+hadoop2.6 and hadoop version is
>> 2.6.2,  when i use java client jdbc to execute sql,there has some issues.
>>
>> java.lang.RuntimeException: Could not load shims in class
>> org.apache.hadoop.hive.schshim.FairSchedulerShim
>> at
>> org.apache.hadoop.hive.shims.ShimLoader.createShim(ShimLoader.java:149)
>> at
>> org.apache.hadoop.hive.shims.ShimLoader.getSchedulerShims(ShimLoader.java:133)
>> at
>> org.apache.hadoop.hive.shims.Hadoop23Shims.refreshDefaultQueue(Hadoop23Shims.java:296)
>> at
>> org.apache.hive.service.cli.session.HiveSessionImpl.(HiveSessionImpl.java:109)
>> at
>> org.apache.hive.service.cli.session.SessionManager.openSession(SessionManager.java:253)
>> at
>> org.apache.spark.sql.hive.thriftserver.SparkSQLSessionManager.openSession(SparkSQLSessionManager.scala:65)
>> at
>> org.apache.hive.service.cli.CLIService.openSession(CLIService.java:194)
>> at
>> org.apache.hive.service.cli.thrift.ThriftCLIService.getSessionHandle(ThriftCLIService.java:405)
>> at
>> org.apache.hive.service.cli.thrift.ThriftCLIService.OpenSession(ThriftCLIService.java:297)
>> at
>> org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1253)
>> at
>> org.apache.hive.service.cli.thrift.TCLIService$Processor$OpenSession.getResult(TCLIService.java:1238)
>> at
>> org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
>> at
>> org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
>> at
>> org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
>> at
>> org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:285)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.hadoop.hive.schshim.FairSchedulerShim
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:195)
>> at
>> org.apache.hadoop.hive.shims.ShimLoader.createShim(ShimLoader.java:146)
>> ... 17 more
>>
>>
>
>


Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
This is the idea I have in mind

I want to go through every line

result.foreachRDD(rdd => rdd.foreach(println))

rather than print each line I want to save them temporarily and then
add/append the result set (lines in RDD ) to a table for further analyses.
It could be a Parquet or Hive table.

So only interested in lines of interest that saves me space on the disk.

I am aware that I can simply write them to text files

result.saveAsTextFiles("/user/hduser/tmp/keep")


Any ideas will be appreciated

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 09:32, Sachin Aggarwal 
wrote:

> sure,
>
> this will be help full try this
>
>
> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html
>
> On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh  > wrote:
>
>> Thanks Sachin. Will test it
>>
>> I guess I can modify it to save the output to a Hive table as opposed to
>> terminal
>>
>> Regards
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 5 April 2016 at 09:06, Sachin Aggarwal 
>> wrote:
>>
>>> Hey ,
>>>
>>> I have changed your example itself try this , it should work in terminal
>>>
>>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
>>> 1)).reduceByKey(_ + _)
>>> result.foreachRDD(rdd => rdd.foreach(println))
>>>
>>>
>>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Thanks.

 Currently this is what I am doing

 // Get the lines
 //
 val lines = messages.map(_._2)
 // Check for message
 val showResults = lines.filter(_.contains("Sending
 messages")).flatMap(line => line.split("\n,")).map(word => (word,
 1)).reduceByKey(_ + _).print(1000)

 So it prints max of 1000 lines to terminal after filter and map. Can
 this be done as suggested?






 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 5 April 2016 at 06:53, Sachin Aggarwal 
 wrote:

> Hi
>
> Instead of using print() directly on Dstream, I will suggest you use 
> foreachRDD
> if you  wanted to materialize all rows , example shown here:-
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> dstream.foreachRDD(rdd => {
>   val connection = createNewConnection()  // executed at the driver
>   rdd.foreach(record => {
>   connection.send(record) // executed at the worker
>   })
>   })
>
>
> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>> However, print(1000) does
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 19:46, Ted Yu  wrote:
>>
>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>
>>> FYI
>>>
>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Thanks Ted.

 As I see print() materializes the first 10 rows. On the other hand
 print(n) will materialise n rows.

 How about if I wanted to materialize all rows?

 Cheers

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 3 April 2016 at 18:05, Ted Yu  wrote:

> Mich:
> See the 

Re: Can't able to access temp table via jdbc client

2016-04-05 Thread ram kumar
Thanks for you input.

But, the jdbc client should be something like this,

{{{
$ *./bin/beeline*
Beeline version 1.5.2 by Apache Hive
beeline>*!connect jdbc:hive2://ip:1*
*show tables;*
++--+--+
| tableName  | isTemporary  |
++--+--+
| check  | false|
++--+--+
5 rows selected (0.126 seconds)
>
}}}
The isTemporary column is not seen.
I suppose the thrift server should be started from spark home

On Tue, Apr 5, 2016 at 1:08 PM, Mich Talebzadeh 
wrote:

> Hi
>
> temp tables are session specific and private to the session. You will not
> be able to see temp tables created by another session in HiveContext.
>
> Likewise creating a table in Hive using a syntax similar to below
>
> CREATE TEMPORARY TABLE tmp AS
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
>
> will only be visible to that session and is created under /tmp/hive/hduser.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 05:52, ram kumar  wrote:
>
>> HI,
>>
>> I started a hive thrift server from hive home,
>> ./bin/hiveserver2
>>
>> opened jdbc client,
>> ./bin/beeline
>> connected to thrift server,
>>
>> 0: > show tables;
>> ++--+
>> |tab_name|
>> ++--+
>> | check  |
>> | people |
>> ++--+
>> 4 rows selected (2.085 seconds)
>> 0:>
>>
>> I can't see the temp tables which i registered with HiveContext
>> from pyspark.sql import HiveContext, Row
>> schemaPeople.registerTempTable("testtemp")
>> Can some1 help me with it
>>
>> Thanks
>>
>>
>


dataframe sorting and find the index of the maximum element

2016-04-05 Thread Angel Angel
Hello,

i am writing one spark application i which i need the index of the maximum
element.

My table has one column only and i want the index of the maximum element.

MAX(count)
23
32
3
Here is my code the data type of the array is
org.apache.spark.sql.Dataframe.


Thanks in advance.
Also please suggest me to do it in another way.

[image: Inline image 1]


RE: [MARKETING] Timeout in mapWithState

2016-04-05 Thread Iain Cundy
Hi Abhi

The concept is what you want – if you set StateSpec timeout to a Duration of 10 
minutes then any keys no seen for more than 10 minutes will be deleted.

However you did say “exactly” and specifically mention “removed from memory” in 
which case you may be interested in the much more complicated actual semantics. 
This is based on my empirical experience and attempting to read code in scala, 
which is not a language I know so e!


1.   The timeout check is only performed when the state is checkpointed. 
This seems to occur once every 10 micro-batches (it is possible that it changes 
based upon something, but I don’t see a good way to configure it since the 
checkpoint call is internal to mapWithState)

2.   When the timeout check is performed two things happen

a.   call gets invoked for the key – a little care is required because 
trying to update the state or remove causes an exception

b.  the key is marked as deleted, which means it will no longer appear in 
snapshots

3.   Note that I didn’t say the key is removed from memory! That only 
happens on some checkpoints when the code decides to “compact” the state data. 
This happen when the chain of delta maps is at least 
“spark.streaming.sessionByKey.deltaChainThreshold”.

a.   What adds a new delta map? – I think it is every checkpoint, but not 
absolutely certain it is that simple

b.  The default seems to be 20, which means deleted keys only get deleted 
from memory once every 190 or 200 micro-batches. The setting above isn’t 
documented anywhere I can find, however you can set it using spark-submit 
–conf. Setting it to 2 does seem to get keys removed from memory when the 
checkpoint deletes them.

Any clarifications or corrections are welcome!

Cheers
Iain

From: Abhishek Anand [mailto:abhis.anan...@gmail.com]
Sent: 05 April 2016 06:40
To: user
Subject: [MARKETING] Timeout in mapWithState

What exactly is timeout in mapWithState ?

I want the keys to get remmoved from the memory if there is no data received on 
that key for 10 minutes.

How can I acheive this in mapWithState ?

Regards,
Abhi

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp


Profiling a spark job

2016-04-05 Thread Dmitry Olshansky
Hi list,

I'm curious as to what are the best practices of profiling spark apps? So far I 
tried following this guide with hprof and/or yourkit but the profile looks 
strange:
https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit

 55% of time spent in EPollWait. However I'm using standalone mode with local 
master without starting separate daemon (could it be that I should?)

---
Dmitry Olshansky
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Registering Metrics Source in Spark

2016-04-05 Thread Gideon
Hi,

I don't have a specific solution to your problem but I was having some
problems writing my own metrics with Spark a few months back
I don't know if it helps but you can try and look at this  thread

  

When I had that issue I wasn't running Spark on Yarn so it might be a
different issue but who knows



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Registering-Metrics-Source-in-Spark-tp26673p26679.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: multiple splits fails

2016-04-05 Thread Sachin Aggarwal
sure,

this will be help full try this

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html

On Tue, Apr 5, 2016 at 1:56 PM, Mich Talebzadeh 
wrote:

> Thanks Sachin. Will test it
>
> I guess I can modify it to save the output to a Hive table as opposed to
> terminal
>
> Regards
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 09:06, Sachin Aggarwal 
> wrote:
>
>> Hey ,
>>
>> I have changed your example itself try this , it should work in terminal
>>
>> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
>> 1)).reduceByKey(_ + _)
>> result.foreachRDD(rdd => rdd.foreach(println))
>>
>>
>> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks.
>>>
>>> Currently this is what I am doing
>>>
>>> // Get the lines
>>> //
>>> val lines = messages.map(_._2)
>>> // Check for message
>>> val showResults = lines.filter(_.contains("Sending
>>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>>> 1)).reduceByKey(_ + _).print(1000)
>>>
>>> So it prints max of 1000 lines to terminal after filter and map. Can
>>> this be done as suggested?
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 5 April 2016 at 06:53, Sachin Aggarwal 
>>> wrote:
>>>
 Hi

 Instead of using print() directly on Dstream, I will suggest you use 
 foreachRDD
 if you  wanted to materialize all rows , example shown here:-


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

 dstream.foreachRDD(rdd => {
   val connection = createNewConnection()  // executed at the driver
   rdd.foreach(record => {
   connection.send(record) // executed at the worker
   })
   })


 On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> I am afraid print(Integer.MAX_VALUE) does not return any lines!
> However, print(1000) does
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 19:46, Ted Yu  wrote:
>
>> Since num is an Int, you can specify Integer.MAX_VALUE
>>
>> FYI
>>
>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Thanks Ted.
>>>
>>> As I see print() materializes the first 10 rows. On the other hand
>>> print(n) will materialise n rows.
>>>
>>> How about if I wanted to materialize all rows?
>>>
>>> Cheers
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 18:05, Ted Yu  wrote:
>>>
 Mich:
 See the following method of DStream:

* Print the first num elements of each RDD generated in this
 DStream. This is an output
* operator, so this DStream will be registered as an output
 stream and there materialized.
*/
   def print(num: Int): Unit = ssc.withScope {

 On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> However this works. I am checking the logic to see if it does what
> I asked it to do
>
> val v = lines.filter(_.contains("ASE 15")).filter(_
> contains("UPDATE INDEX STATISTICS")).flatMap(line =>
> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print
>
> scala> ssc.start()
>
> scala> ---
> Time: 1459701925000 ms
> ---
> (* Check that you have run 

Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
Thanks Sachin. Will test it

I guess I can modify it to save the output to a Hive table as opposed to
terminal

Regards

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 09:06, Sachin Aggarwal 
wrote:

> Hey ,
>
> I have changed your example itself try this , it should work in terminal
>
> val result = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE 
> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word, 
> 1)).reduceByKey(_ + _)
> result.foreachRDD(rdd => rdd.foreach(println))
>
>
> On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh  > wrote:
>
>> Thanks.
>>
>> Currently this is what I am doing
>>
>> // Get the lines
>> //
>> val lines = messages.map(_._2)
>> // Check for message
>> val showResults = lines.filter(_.contains("Sending
>> messages")).flatMap(line => line.split("\n,")).map(word => (word,
>> 1)).reduceByKey(_ + _).print(1000)
>>
>> So it prints max of 1000 lines to terminal after filter and map. Can this
>> be done as suggested?
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 5 April 2016 at 06:53, Sachin Aggarwal 
>> wrote:
>>
>>> Hi
>>>
>>> Instead of using print() directly on Dstream, I will suggest you use 
>>> foreachRDD
>>> if you  wanted to materialize all rows , example shown here:-
>>>
>>>
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>
>>> dstream.foreachRDD(rdd => {
>>>   val connection = createNewConnection()  // executed at the driver
>>>   rdd.foreach(record => {
>>>   connection.send(record) // executed at the worker
>>>   })
>>>   })
>>>
>>>
>>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I am afraid print(Integer.MAX_VALUE) does not return any lines!
 However, print(1000) does

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 3 April 2016 at 19:46, Ted Yu  wrote:

> Since num is an Int, you can specify Integer.MAX_VALUE
>
> FYI
>
> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks Ted.
>>
>> As I see print() materializes the first 10 rows. On the other hand
>> print(n) will materialise n rows.
>>
>> How about if I wanted to materialize all rows?
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 18:05, Ted Yu  wrote:
>>
>>> Mich:
>>> See the following method of DStream:
>>>
>>>* Print the first num elements of each RDD generated in this
>>> DStream. This is an output
>>>* operator, so this DStream will be registered as an output
>>> stream and there materialized.
>>>*/
>>>   def print(num: Int): Unit = ssc.withScope {
>>>
>>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 However this works. I am checking the logic to see if it does what
 I asked it to do

 val v = lines.filter(_.contains("ASE 15")).filter(_
 contains("UPDATE INDEX STATISTICS")).flatMap(line =>
 line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print

 scala> ssc.start()

 scala> ---
 Time: 1459701925000 ms
 ---
 (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
 databases,27)
 (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15
 OR,27)
 (Once databases are loaded to ASE 15, then you will need to
 maintain them the way you maintain your PROD. For example run UPDATE 
 INDEX
 STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
 that people do is NOT pruning data from 

Re: multiple splits fails

2016-04-05 Thread Sachin Aggarwal
Hey ,

I have changed your example itself try this , it should work in terminal

val result = lines.filter(_.contains("ASE 15")).filter(_
contains("UPDATE INDEX STATISTICS")).flatMap(line =>
line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
result.foreachRDD(rdd => rdd.foreach(println))


On Tue, Apr 5, 2016 at 1:00 PM, Mich Talebzadeh 
wrote:

> Thanks.
>
> Currently this is what I am doing
>
> // Get the lines
> //
> val lines = messages.map(_._2)
> // Check for message
> val showResults = lines.filter(_.contains("Sending
> messages")).flatMap(line => line.split("\n,")).map(word => (word,
> 1)).reduceByKey(_ + _).print(1000)
>
> So it prints max of 1000 lines to terminal after filter and map. Can this
> be done as suggested?
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 5 April 2016 at 06:53, Sachin Aggarwal 
> wrote:
>
>> Hi
>>
>> Instead of using print() directly on Dstream, I will suggest you use 
>> foreachRDD
>> if you  wanted to materialize all rows , example shown here:-
>>
>>
>> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>
>> dstream.foreachRDD(rdd => {
>>   val connection = createNewConnection()  // executed at the driver
>>   rdd.foreach(record => {
>>   connection.send(record) // executed at the worker
>>   })
>>   })
>>
>>
>> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I am afraid print(Integer.MAX_VALUE) does not return any lines!
>>> However, print(1000) does
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 3 April 2016 at 19:46, Ted Yu  wrote:
>>>
 Since num is an Int, you can specify Integer.MAX_VALUE

 FYI

 On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Thanks Ted.
>
> As I see print() materializes the first 10 rows. On the other hand
> print(n) will materialise n rows.
>
> How about if I wanted to materialize all rows?
>
> Cheers
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 3 April 2016 at 18:05, Ted Yu  wrote:
>
>> Mich:
>> See the following method of DStream:
>>
>>* Print the first num elements of each RDD generated in this
>> DStream. This is an output
>>* operator, so this DStream will be registered as an output stream
>> and there materialized.
>>*/
>>   def print(num: Int): Unit = ssc.withScope {
>>
>> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> However this works. I am checking the logic to see if it does what I
>>> asked it to do
>>>
>>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => 
>>> (word,
>>> 1)).reduceByKey(_ + _).print
>>>
>>> scala> ssc.start()
>>>
>>> scala> ---
>>> Time: 1459701925000 ms
>>> ---
>>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>>> databases,27)
>>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>>> (Once databases are loaded to ASE 15, then you will need to maintain
>>> them the way you maintain your PROD. For example run UPDATE INDEX
>>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>>> that people do is NOT pruning data from daily log tables in ASE 15 etc 
>>> as
>>> they do it in PROD. This normally results in slower performance on ASE 
>>> 15
>>> databases as test cycles continue. Use MDA readings to measure daily DML
>>> activities on ASE 15 tables and compare them with those of PROD. A 24 
>>> hour
>>> cycle measurement should be good. If you notice that certain tables have
>>> different DML hits (insert/update/delete) compared to PROD you will know
>>> that either ASE 15 is not doing everything in terms of batch activity 
>>> (some
>>> jobs are missing), or there is something 

DataSet with Array member

2016-04-05 Thread JH P
Hi everyone. I have such class

case class DistinctValues(statType: Int, dataType: Int, _id: Int, values: 
Array[(String, Long)], category: String) extends Serializable {

I think this class won't work in case of  DistinctValues. values.length > 
Int.MaxValue.

Moreover I instantiate this class by
  .mapGroups { (cdid, itr) =>
val arr = itr.toArray
val values = arr.flatMap(x => Array(x._2._1)).toArray.distinct
val category = arr.head._2._2

DistinctValues(GV.ACC_STAT, GV.STRING_TYPE, cdid, values, category)
  }

I also concern that above code won’t work in case of itr.length > Int.MaxValue 

How should I fix DistinctValues class?

Re: Detecting application restart when running in supervised cluster mode

2016-04-05 Thread Saisai Shao
Hi Deepak,

I don't think supervise can be worked with yarn, it is a standalone and
Mesos specific feature.

Thanks
Saisai

On Tue, Apr 5, 2016 at 3:23 PM, Deepak Sharma  wrote:

> Hi Rafael
> If you are using yarn as the engine , you can always use RM UI to see the
> application progress.
>
> Thanks
> Deepak
>
> On Tue, Apr 5, 2016 at 12:18 PM, Rafael Barreto  > wrote:
>
>> Hello,
>>
>> I have a driver deployed using `spark-submit` in supervised cluster mode.
>> Sometimes my application would die for some transient problem and the
>> restart works perfectly. However, it would be useful to get alerted when
>> that happens. Is there any out-of-the-box way of doing that? Perhaps a hook
>> that I can use to catch an event? I guess I could poll my application state
>> using Spark REST API, but if there was something more elegant, I would
>> rather use it.
>>
>> Thanks in advance,
>> Rafael Barreto
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: Can't able to access temp table via jdbc client

2016-04-05 Thread Mich Talebzadeh
Hi

temp tables are session specific and private to the session. You will not
be able to see temp tables created by another session in HiveContext.

Likewise creating a table in Hive using a syntax similar to below

CREATE TEMPORARY TABLE tmp AS
SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
TotalSales

will only be visible to that session and is created under /tmp/hive/hduser.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 05:52, ram kumar  wrote:

> HI,
>
> I started a hive thrift server from hive home,
> ./bin/hiveserver2
>
> opened jdbc client,
> ./bin/beeline
> connected to thrift server,
>
> 0: > show tables;
> ++--+
> |tab_name|
> ++--+
> | check  |
> | people |
> ++--+
> 4 rows selected (2.085 seconds)
> 0:>
>
> I can't see the temp tables which i registered with HiveContext
> from pyspark.sql import HiveContext, Row
> schemaPeople.registerTempTable("testtemp")
> Can some1 help me with it
>
> Thanks
>
>


Re: multiple splits fails

2016-04-05 Thread Mich Talebzadeh
Thanks.

Currently this is what I am doing

// Get the lines
//
val lines = messages.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending messages")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)

So it prints max of 1000 lines to terminal after filter and map. Can this
be done as suggested?






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 5 April 2016 at 06:53, Sachin Aggarwal 
wrote:

> Hi
>
> Instead of using print() directly on Dstream, I will suggest you use 
> foreachRDD
> if you  wanted to materialize all rows , example shown here:-
>
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> dstream.foreachRDD(rdd => {
>   val connection = createNewConnection()  // executed at the driver
>   rdd.foreach(record => {
>   connection.send(record) // executed at the worker
>   })
>   })
>
>
> On Mon, Apr 4, 2016 at 12:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I am afraid print(Integer.MAX_VALUE) does not return any lines!  However,
>> print(1000) does
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 3 April 2016 at 19:46, Ted Yu  wrote:
>>
>>> Since num is an Int, you can specify Integer.MAX_VALUE
>>>
>>> FYI
>>>
>>> On Sun, Apr 3, 2016 at 10:58 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Thanks Ted.

 As I see print() materializes the first 10 rows. On the other hand
 print(n) will materialise n rows.

 How about if I wanted to materialize all rows?

 Cheers

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 3 April 2016 at 18:05, Ted Yu  wrote:

> Mich:
> See the following method of DStream:
>
>* Print the first num elements of each RDD generated in this
> DStream. This is an output
>* operator, so this DStream will be registered as an output stream
> and there materialized.
>*/
>   def print(num: Int): Unit = ssc.withScope {
>
> On Sun, Apr 3, 2016 at 9:32 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> However this works. I am checking the logic to see if it does what I
>> asked it to do
>>
>> val v = lines.filter(_.contains("ASE 15")).filter(_ contains("UPDATE
>> INDEX STATISTICS")).flatMap(line => line.split("\n,")).map(word => (word,
>> 1)).reduceByKey(_ + _).print
>>
>> scala> ssc.start()
>>
>> scala> ---
>> Time: 1459701925000 ms
>> ---
>> (* Check that you have run UPDATE INDEX STATISTICS on all ASE 15
>> databases,27)
>> (o You can try UPDATE INDEX STATISTICS WITH SAMPLING in ASE 15 OR,27)
>> (Once databases are loaded to ASE 15, then you will need to maintain
>> them the way you maintain your PROD. For example run UPDATE INDEX
>> STATISTICS and REORG COMPACT as necessary. One of the frequent mistakes
>> that people do is NOT pruning data from daily log tables in ASE 15 etc as
>> they do it in PROD. This normally results in slower performance on ASE 15
>> databases as test cycles continue. Use MDA readings to measure daily DML
>> activities on ASE 15 tables and compare them with those of PROD. A 24 
>> hour
>> cycle measurement should be good. If you notice that certain tables have
>> different DML hits (insert/update/delete) compared to PROD you will know
>> that either ASE 15 is not doing everything in terms of batch activity 
>> (some
>> jobs are missing), or there is something inconsistent somewhere. ,27)
>> (* Make sure that you have enough tempdb system segment space for
>> UPDATE INDEX STATISTICS. It is always advisable to gauge the tempdb size
>> required in ASE 15 QA and expand the tempdb database in production
>> accordingly. The last thing you want is to blow up tempdb over the
>> migration weekend.,27)
>> (o In ASE 15 you can subdivide the task by running parallel UPDATE
>> INDEX STATISTICS on different tables in the same database at the same 
>> time.
>> Watch tempdb segment growth though! OR,27)
>>

Re: Detecting application restart when running in supervised cluster mode

2016-04-05 Thread Deepak Sharma
Hi Rafael
If you are using yarn as the engine , you can always use RM UI to see the
application progress.

Thanks
Deepak

On Tue, Apr 5, 2016 at 12:18 PM, Rafael Barreto 
wrote:

> Hello,
>
> I have a driver deployed using `spark-submit` in supervised cluster mode.
> Sometimes my application would die for some transient problem and the
> restart works perfectly. However, it would be useful to get alerted when
> that happens. Is there any out-of-the-box way of doing that? Perhaps a hook
> that I can use to catch an event? I guess I could poll my application state
> using Spark REST API, but if there was something more elegant, I would
> rather use it.
>
> Thanks in advance,
> Rafael Barreto
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net