Re: Dataframe Partitioning

2016-03-01 Thread yash datta
+1
This is  one of the most common problems we encounter in our flow. Mark, I
am happy to help if you would like to share some of the workload.

Best
Yash

On Wednesday 2 March 2016, Mark Hamstra  wrote:

> I don't entirely agree.  You're best off picking the right size :).
> That's almost impossible, though, since at the input end of the query
> processing you often want a large number of partitions to get sufficient
> parallelism for both performance and to avoid spilling or OOM, while at the
> output end of the query processing (after all the pruning and filtering)
> you often have only a few result rows, which means that splitting those few
> rows across many partitions in order to do a sort or similar is actually
> pretty silly and inefficient. I'll frequently see sorts where the
> per-partition sorts have only one or two records and it would have been
> quicker and more efficient to sort using a small number of partitions
> rather than using RangePartitioning to split the few rows across many
> partitions, then doing a degenerate/trivial form of sort on each of those
> partitions with their one or two rows, and finally merging all those tiny
> partitions back in order to produce the final results.
>
> Since the optimum number of shuffle partitions is different at different
> points in the query processing flow, it's really impossible to pick a
> static best number of shuffle partitions.  Using spark.sql.adaptive.enabled
> to turn on ExchangeCoordinator and dynamically set the number of shuffle
> partitions mostly works pretty well, but it still has at least a couple of
> issues.  One is that it makes things worse in the case of data skew since
> it doesn't stop coalescing partitions until after the coalesced partition
> size exceeds a target value; so if you've got some big ugly partitions that
> exceed the target size all on their own, they'll often be even bigger and
> uglier after the ExchangeCoordinator is done merging them with a few
> smaller partitions.  The other issue is that adaptive partitioning doesn't
> even try to do anything currently with any partitioning other than
> HashPartitioning, so you've still got the sorting problem using
> RangePartitioning that I just got done describing.
>
> I've actually started working on addressing each of those problems.
>
> On Tue, Mar 1, 2016 at 3:43 PM, Michael Armbrust  > wrote:
>
>> If you have to pick a number, its better to over estimate than
>> underestimate since task launching in spark is relatively cheap compared to
>> spilling to disk or OOMing (now much less likely due to Tungsten).
>> Eventually, we plan to make this dynamic, but you should tune for your
>> particular workload.
>>
>> On Tue, Mar 1, 2016 at 3:19 PM, Teng Liao > > wrote:
>>
>>> Hi,
>>>
>>> I was wondering what the rationale behind defaulting all repartitioning
>>> to spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running
>>> a job whose input partitions is 2 and, using the default value for
>>> spark.sql.shuffle.partitions, this is now 200. Thanks.
>>>
>>> -Teng Fei Liao
>>>
>>
>>
>

-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


HashedRelation Memory Pressure on Broadcast Joins

2016-03-01 Thread Matt Cheah
Hi everyone,

I had a quick question regarding our implementation of UnsafeHashedRelation and 
HashedRelation.
 It appears that we copy the rows that we’ve collected into memory upon 
inserting them into the hash table in UnsafeHashedRelation#apply(). I was 
wondering why we are copying the rows every time? I can’t imagine these rows 
being mutable in this scenario.

The context is that I’m looking into a case where a small data frame should fit 
in the driver’s memory, but my driver ran out of memory after I increased the 
autoBroadcastJoinThreshold. YourKit is indicating that this logic is consuming 
more memory than my driver can handle.

Thanks,

-Matt Cheah


Re: Dataframe Partitioning

2016-03-01 Thread Mark Hamstra
I don't entirely agree.  You're best off picking the right size :).  That's
almost impossible, though, since at the input end of the query processing
you often want a large number of partitions to get sufficient parallelism
for both performance and to avoid spilling or OOM, while at the output end
of the query processing (after all the pruning and filtering) you often
have only a few result rows, which means that splitting those few rows
across many partitions in order to do a sort or similar is actually pretty
silly and inefficient. I'll frequently see sorts where the per-partition
sorts have only one or two records and it would have been quicker and more
efficient to sort using a small number of partitions rather than using
RangePartitioning to split the few rows across many partitions, then doing
a degenerate/trivial form of sort on each of those partitions with their
one or two rows, and finally merging all those tiny partitions back in
order to produce the final results.

Since the optimum number of shuffle partitions is different at different
points in the query processing flow, it's really impossible to pick a
static best number of shuffle partitions.  Using spark.sql.adaptive.enabled
to turn on ExchangeCoordinator and dynamically set the number of shuffle
partitions mostly works pretty well, but it still has at least a couple of
issues.  One is that it makes things worse in the case of data skew since
it doesn't stop coalescing partitions until after the coalesced partition
size exceeds a target value; so if you've got some big ugly partitions that
exceed the target size all on their own, they'll often be even bigger and
uglier after the ExchangeCoordinator is done merging them with a few
smaller partitions.  The other issue is that adaptive partitioning doesn't
even try to do anything currently with any partitioning other than
HashPartitioning, so you've still got the sorting problem using
RangePartitioning that I just got done describing.

I've actually started working on addressing each of those problems.

On Tue, Mar 1, 2016 at 3:43 PM, Michael Armbrust 
wrote:

> If you have to pick a number, its better to over estimate than
> underestimate since task launching in spark is relatively cheap compared to
> spilling to disk or OOMing (now much less likely due to Tungsten).
> Eventually, we plan to make this dynamic, but you should tune for your
> particular workload.
>
> On Tue, Mar 1, 2016 at 3:19 PM, Teng Liao  wrote:
>
>> Hi,
>>
>> I was wondering what the rationale behind defaulting all repartitioning
>> to spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running
>> a job whose input partitions is 2 and, using the default value for
>> spark.sql.shuffle.partitions, this is now 200. Thanks.
>>
>> -Teng Fei Liao
>>
>
>


Dataframe Partitioning

2016-03-01 Thread Teng Liao
Hi,

I was wondering what the rationale behind defaulting all repartitioning to 
spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running a job 
whose input partitions is 2 and, using the default value for 
spark.sql.shuffle.partitions, this is now 200. Thanks.

-Teng Fei Liao


Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Jerry Lam
Hi Reynold,

You are right. It is about the audience. For instance, in many of my cases,
the SQL style is very attractive if not mandatory for people with minimum
programming knowledge. SQL has its place for communication. Last time I
show someone spark dataframe-style, they immediately said it is too
difficult to use. When I change it to SQL, they are suddenly happy and say
how you do this. It sounds stupid but that's what it is for now.

The following example will make some banks happy (copy from the Oracle
solution):

SELECT *
FROM Ticker MATCH_RECOGNIZE (
 PARTITION BY symbol
 ORDER BY tstamp
 MEASURES  STRT.tstamp AS start_tstamp,
   LAST(DOWN.tstamp) AS bottom_tstamp,
   LAST(UP.tstamp) AS end_tstamp
 ONE ROW PER MATCH
 AFTER MATCH SKIP TO LAST UP
 PATTERN (STRT DOWN+ UP+)
 DEFINE
DOWN AS DOWN.price < PREV(DOWN.price),
UP AS UP.price > PREV(UP.price)
 ) MR
ORDER BY MR.symbol, MR.start_tstamp;

Basically this query finds all cases where stock prices dipped to a bottom
price and then rose (the popular V-shape).  It might be confusing at first
but it is still readable for many users who know SQL. Note that the PATTERN
is interesting; it is a regular expression on the symbols defined (DOWN and
UP, STRT is not define so it means it matches any event).

Most CEP solutions have a SQL-like interface.

Best Regards,

Jerry



On Tue, Mar 1, 2016 at 4:44 PM, Reynold Xin  wrote:

> There are definitely pros and cons for Scala vs SQL-style CEP. Scala might
> be more powerful, but the target audience is very different.
>
> How much usage is there for a CEP style SQL syntax in practice? I've never
> seen it coming up so far.
>
>
>
> On Tue, Mar 1, 2016 at 9:35 AM, Alex Kozlov  wrote:
>
>> Looked at the paper: while we can argue on the performance side, I think
>> semantically the Scala pattern matching is much more expressive.  The time
>> will decide.
>>
>> On Tue, Mar 1, 2016 at 9:07 AM, Jerry Lam  wrote:
>>
>>> Hi Alex,
>>>
>>> We went through this path already :) This is the reason we try other
>>> approaches. The recursion makes it very inefficient for some cases.
>>> For details, this paper describes it very well:
>>> https://people.cs.umass.edu/%7Eyanlei/publications/sase-sigmod08.pdf
>>> which is the same paper references in Flink ticket.
>>>
>>> Please let me know if I overlook something. Thank you for sharing this!
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>> On Tue, Mar 1, 2016 at 11:58 AM, Alex Kozlov  wrote:
>>>
 For the purpose of full disclosure, I think Scala offers a much more
 efficient pattern matching paradigm.  Using nPath is like using assembler
 to program distributed systems.  Cannot tell much here today, but the
 pattern would look like:

  | def matchSessions(h: Seq[Session[PageView]], id: String, p:
 Seq[PageView]) :

 Seq[Session[PageView]] = {|   p match {

  | case Nil => Nil

  | case PageView(ts1, "company.com>homepage") ::
 PageView(ts2,

 "company.com>plus>products landing") :: tail if ts2 > ts1 + 600 =>

  |   matchSessions(h, id, tail).+:(new Session(id, p))

  | case _ => matchSessions(h, id, p.tail)

  |   }

 Look for Scala case statements with guards and upcoming book releases.

 http://docs.scala-lang.org/tutorials/tour/pattern-matching

 https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch03s14.html

 On Tue, Mar 1, 2016 at 8:34 AM, Henri Dubois-Ferriere <
 henr...@gmail.com> wrote:

> fwiw Apache Flink just added CEP. Queries are constructed
> programmatically rather than in SQL, but the underlying functionality is
> similar.
>
> https://issues.apache.org/jira/browse/FLINK-3215
>
> On 1 March 2016 at 08:19, Jerry Lam  wrote:
>
>> Hi Herman,
>>
>> Thank you for your reply!
>> This functionality usually finds its place in financial services
>> which use CEP (complex event processing) for correlation and pattern
>> matching. Many commercial products have this including Oracle and 
>> Teradata
>> Aster Data MR Analytics. I do agree the syntax a bit awkward but after 
>> you
>> understand it, it is actually very compact for expressing something that 
>> is
>> very complex. Esper has this feature partially implemented (
>> http://www.espertech.com/esper/release-5.1.0/esper-reference/html/match-recognize.html
>> ).
>>
>> I found the Teradata Analytics documentation best to describe the
>> usage of it. For example (note npath is similar to match_recognize):
>>
>> SELECT last_pageid, MAX( count_page80 )
>>  FROM nPath(
>>  ON ( SELECT * FROM clicks WHERE category >= 0 )

Re: [Proposal] Enabling time series analysis on spark metrics

2016-03-01 Thread Reynold Xin
Is the suggestion just to use a different config (and maybe fallback to
appid) in order to publish metrics? Seems reasonable.


On Tue, Mar 1, 2016 at 8:17 AM, Karan Kumar 
wrote:

> +dev mailing list
>
> Time series analysis on metrics becomes quite useful when running spark
> jobs using a workflow manager like oozie.
>
> Would love to take this up if the community thinks its worthwhile.
>
> On Tue, Feb 23, 2016 at 2:59 PM, Karan Kumar 
> wrote:
>
>> HI
>>
>> Spark at the moment uses application ID to report metrics. I was thinking
>> that if we can create an option to export metrics on a user-controlled key.
>> This will allow us to do time series analysis on counters by dumping these
>> counters in a DB such as graphite.
>>
>> One of the approaches I had in mind was allowing a user to set a property
>> via the spark client. If that property is set, use the property value to
>> report metrics else use the current implementation
>> of
>> reporting metrics on appid.
>>
>> Thoughts?
>>
>> --
>> Thanks
>> Karan
>>
>
>
>
> --
> Thanks
> Karan
>


Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Reynold Xin
There are definitely pros and cons for Scala vs SQL-style CEP. Scala might
be more powerful, but the target audience is very different.

How much usage is there for a CEP style SQL syntax in practice? I've never
seen it coming up so far.



On Tue, Mar 1, 2016 at 9:35 AM, Alex Kozlov  wrote:

> Looked at the paper: while we can argue on the performance side, I think
> semantically the Scala pattern matching is much more expressive.  The time
> will decide.
>
> On Tue, Mar 1, 2016 at 9:07 AM, Jerry Lam  wrote:
>
>> Hi Alex,
>>
>> We went through this path already :) This is the reason we try other
>> approaches. The recursion makes it very inefficient for some cases.
>> For details, this paper describes it very well:
>> https://people.cs.umass.edu/%7Eyanlei/publications/sase-sigmod08.pdf
>> which is the same paper references in Flink ticket.
>>
>> Please let me know if I overlook something. Thank you for sharing this!
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Mar 1, 2016 at 11:58 AM, Alex Kozlov  wrote:
>>
>>> For the purpose of full disclosure, I think Scala offers a much more
>>> efficient pattern matching paradigm.  Using nPath is like using assembler
>>> to program distributed systems.  Cannot tell much here today, but the
>>> pattern would look like:
>>>
>>>  | def matchSessions(h: Seq[Session[PageView]], id: String, p:
>>> Seq[PageView]) :
>>>
>>> Seq[Session[PageView]] = {|   p match {
>>>
>>>  | case Nil => Nil
>>>
>>>  | case PageView(ts1, "company.com>homepage") ::
>>> PageView(ts2,
>>>
>>> "company.com>plus>products landing") :: tail if ts2 > ts1 + 600 =>
>>>
>>>  |   matchSessions(h, id, tail).+:(new Session(id, p))
>>>
>>>  | case _ => matchSessions(h, id, p.tail)
>>>
>>>  |   }
>>>
>>> Look for Scala case statements with guards and upcoming book releases.
>>>
>>> http://docs.scala-lang.org/tutorials/tour/pattern-matching
>>>
>>> https://www.safaribooksonline.com/library/view/scala-cookbook/9781449340292/ch03s14.html
>>>
>>> On Tue, Mar 1, 2016 at 8:34 AM, Henri Dubois-Ferriere >> > wrote:
>>>
 fwiw Apache Flink just added CEP. Queries are constructed
 programmatically rather than in SQL, but the underlying functionality is
 similar.

 https://issues.apache.org/jira/browse/FLINK-3215

 On 1 March 2016 at 08:19, Jerry Lam  wrote:

> Hi Herman,
>
> Thank you for your reply!
> This functionality usually finds its place in financial services which
> use CEP (complex event processing) for correlation and pattern matching.
> Many commercial products have this including Oracle and Teradata Aster 
> Data
> MR Analytics. I do agree the syntax a bit awkward but after you understand
> it, it is actually very compact for expressing something that is very
> complex. Esper has this feature partially implemented (
> http://www.espertech.com/esper/release-5.1.0/esper-reference/html/match-recognize.html
> ).
>
> I found the Teradata Analytics documentation best to describe the
> usage of it. For example (note npath is similar to match_recognize):
>
> SELECT last_pageid, MAX( count_page80 )
>  FROM nPath(
>  ON ( SELECT * FROM clicks WHERE category >= 0 )
>  PARTITION BY sessionid
>  ORDER BY ts
>  PATTERN ( 'A.(B|C)*' )
>  MODE ( OVERLAPPING )
>  SYMBOLS ( pageid = 50 AS A,
>pageid = 80 AS B,
>pageid <> 80 AND category IN (9,10) AS C )
>  RESULT ( LAST ( pageid OF ANY ( A,B,C ) ) AS last_pageid,
>   COUNT ( * OF B ) AS count_page80,
>   COUNT ( * OF ANY ( A,B,C ) ) AS count_any )
>  )
>  WHERE count_any >= 5
>  GROUP BY last_pageid
>  ORDER BY MAX( count_page80 )
>
> The above means:
> Find user click-paths starting at pageid 50 and passing exclusively
> through either pageid 80 or pages in category 9 or category 10. Find the
> pageid of the last page in the path and count the number of times page 80
> was visited. Report the maximum count for each last page, and sort the
> output by the latter. Restrict to paths containing at least 5 pages. 
> Ignore
> pages in the sequence with category < 0.
>
> If this query is written in pure SQL (if possible at all), it requires
> several self-joins. The interesting thing about this feature is that it
> integrates SQL+Streaming+ML in one (perhaps potentially graph too).
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Mar 1, 2016 at 9:39 AM, Herman van Hövell tot Westerflier <
> hvanhov...@questtec.nl> wrote:
>
>> Hi Jerry,
>>
>> This is not on any roadmap. I (shortly) browsed through this; and
>> this looks like some sort of a window function with very awkward syntax. 
>> I
>> think spark 

Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Jerry Lam
Hi Henri,

Finally, there is a good reason for me to use Flink! Thanks for sharing
this information. This is exactly the solution I'm looking for especially
the ticket references a paper I was reading a week ago. It would be nice if
Flink adds support SQL because this makes business analyst (traders as
well) a way to express it.

Best Regards,

Jerry

On Tue, Mar 1, 2016 at 11:34 AM, Henri Dubois-Ferriere 
wrote:

> fwiw Apache Flink just added CEP. Queries are constructed programmatically
> rather than in SQL, but the underlying functionality is similar.
>
> https://issues.apache.org/jira/browse/FLINK-3215
>
> On 1 March 2016 at 08:19, Jerry Lam  wrote:
>
>> Hi Herman,
>>
>> Thank you for your reply!
>> This functionality usually finds its place in financial services which
>> use CEP (complex event processing) for correlation and pattern matching.
>> Many commercial products have this including Oracle and Teradata Aster Data
>> MR Analytics. I do agree the syntax a bit awkward but after you understand
>> it, it is actually very compact for expressing something that is very
>> complex. Esper has this feature partially implemented (
>> http://www.espertech.com/esper/release-5.1.0/esper-reference/html/match-recognize.html
>> ).
>>
>> I found the Teradata Analytics documentation best to describe the usage
>> of it. For example (note npath is similar to match_recognize):
>>
>> SELECT last_pageid, MAX( count_page80 )
>>  FROM nPath(
>>  ON ( SELECT * FROM clicks WHERE category >= 0 )
>>  PARTITION BY sessionid
>>  ORDER BY ts
>>  PATTERN ( 'A.(B|C)*' )
>>  MODE ( OVERLAPPING )
>>  SYMBOLS ( pageid = 50 AS A,
>>pageid = 80 AS B,
>>pageid <> 80 AND category IN (9,10) AS C )
>>  RESULT ( LAST ( pageid OF ANY ( A,B,C ) ) AS last_pageid,
>>   COUNT ( * OF B ) AS count_page80,
>>   COUNT ( * OF ANY ( A,B,C ) ) AS count_any )
>>  )
>>  WHERE count_any >= 5
>>  GROUP BY last_pageid
>>  ORDER BY MAX( count_page80 )
>>
>> The above means:
>> Find user click-paths starting at pageid 50 and passing exclusively
>> through either pageid 80 or pages in category 9 or category 10. Find the
>> pageid of the last page in the path and count the number of times page 80
>> was visited. Report the maximum count for each last page, and sort the
>> output by the latter. Restrict to paths containing at least 5 pages. Ignore
>> pages in the sequence with category < 0.
>>
>> If this query is written in pure SQL (if possible at all), it requires
>> several self-joins. The interesting thing about this feature is that it
>> integrates SQL+Streaming+ML in one (perhaps potentially graph too).
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Tue, Mar 1, 2016 at 9:39 AM, Herman van Hövell tot Westerflier <
>> hvanhov...@questtec.nl> wrote:
>>
>>> Hi Jerry,
>>>
>>> This is not on any roadmap. I (shortly) browsed through this; and this
>>> looks like some sort of a window function with very awkward syntax. I think
>>> spark provided better constructs for this using dataframes/datasets/nested
>>> data...
>>>
>>> Feel free to submit a PR.
>>>
>>> Kind regards,
>>>
>>> Herman van Hövell
>>>
>>> 2016-03-01 15:16 GMT+01:00 Jerry Lam :
>>>
 Hi Spark developers,

 Will you consider to add support for implementing "Pattern matching in
 sequences of rows"? More specifically, I'm referring to this:
 http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf

 This is a very cool/useful feature to pattern matching over live
 stream/archived data. It is sorted of related to machine learning because
 this is usually used in clickstream analysis or path analysis. Also it is
 related to streaming because of the nature of the processing (time series
 data mostly). It is SQL because there is a good way to express and optimize
 the query.

 Best Regards,

 Jerry

>>>
>>>
>>
>


Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Henri Dubois-Ferriere
fwiw Apache Flink just added CEP. Queries are constructed programmatically
rather than in SQL, but the underlying functionality is similar.

https://issues.apache.org/jira/browse/FLINK-3215

On 1 March 2016 at 08:19, Jerry Lam  wrote:

> Hi Herman,
>
> Thank you for your reply!
> This functionality usually finds its place in financial services which use
> CEP (complex event processing) for correlation and pattern matching. Many
> commercial products have this including Oracle and Teradata Aster Data MR
> Analytics. I do agree the syntax a bit awkward but after you understand it,
> it is actually very compact for expressing something that is very complex.
> Esper has this feature partially implemented (
> http://www.espertech.com/esper/release-5.1.0/esper-reference/html/match-recognize.html
> ).
>
> I found the Teradata Analytics documentation best to describe the usage of
> it. For example (note npath is similar to match_recognize):
>
> SELECT last_pageid, MAX( count_page80 )
>  FROM nPath(
>  ON ( SELECT * FROM clicks WHERE category >= 0 )
>  PARTITION BY sessionid
>  ORDER BY ts
>  PATTERN ( 'A.(B|C)*' )
>  MODE ( OVERLAPPING )
>  SYMBOLS ( pageid = 50 AS A,
>pageid = 80 AS B,
>pageid <> 80 AND category IN (9,10) AS C )
>  RESULT ( LAST ( pageid OF ANY ( A,B,C ) ) AS last_pageid,
>   COUNT ( * OF B ) AS count_page80,
>   COUNT ( * OF ANY ( A,B,C ) ) AS count_any )
>  )
>  WHERE count_any >= 5
>  GROUP BY last_pageid
>  ORDER BY MAX( count_page80 )
>
> The above means:
> Find user click-paths starting at pageid 50 and passing exclusively
> through either pageid 80 or pages in category 9 or category 10. Find the
> pageid of the last page in the path and count the number of times page 80
> was visited. Report the maximum count for each last page, and sort the
> output by the latter. Restrict to paths containing at least 5 pages. Ignore
> pages in the sequence with category < 0.
>
> If this query is written in pure SQL (if possible at all), it requires
> several self-joins. The interesting thing about this feature is that it
> integrates SQL+Streaming+ML in one (perhaps potentially graph too).
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Mar 1, 2016 at 9:39 AM, Herman van Hövell tot Westerflier <
> hvanhov...@questtec.nl> wrote:
>
>> Hi Jerry,
>>
>> This is not on any roadmap. I (shortly) browsed through this; and this
>> looks like some sort of a window function with very awkward syntax. I think
>> spark provided better constructs for this using dataframes/datasets/nested
>> data...
>>
>> Feel free to submit a PR.
>>
>> Kind regards,
>>
>> Herman van Hövell
>>
>> 2016-03-01 15:16 GMT+01:00 Jerry Lam :
>>
>>> Hi Spark developers,
>>>
>>> Will you consider to add support for implementing "Pattern matching in
>>> sequences of rows"? More specifically, I'm referring to this:
>>> http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf
>>>
>>> This is a very cool/useful feature to pattern matching over live
>>> stream/archived data. It is sorted of related to machine learning because
>>> this is usually used in clickstream analysis or path analysis. Also it is
>>> related to streaming because of the nature of the processing (time series
>>> data mostly). It is SQL because there is a good way to express and optimize
>>> the query.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Jerry Lam
Hi Herman,

Thank you for your reply!
This functionality usually finds its place in financial services which use
CEP (complex event processing) for correlation and pattern matching. Many
commercial products have this including Oracle and Teradata Aster Data MR
Analytics. I do agree the syntax a bit awkward but after you understand it,
it is actually very compact for expressing something that is very complex.
Esper has this feature partially implemented (
http://www.espertech.com/esper/release-5.1.0/esper-reference/html/match-recognize.html
).

I found the Teradata Analytics documentation best to describe the usage of
it. For example (note npath is similar to match_recognize):

SELECT last_pageid, MAX( count_page80 )
 FROM nPath(
 ON ( SELECT * FROM clicks WHERE category >= 0 )
 PARTITION BY sessionid
 ORDER BY ts
 PATTERN ( 'A.(B|C)*' )
 MODE ( OVERLAPPING )
 SYMBOLS ( pageid = 50 AS A,
   pageid = 80 AS B,
   pageid <> 80 AND category IN (9,10) AS C )
 RESULT ( LAST ( pageid OF ANY ( A,B,C ) ) AS last_pageid,
  COUNT ( * OF B ) AS count_page80,
  COUNT ( * OF ANY ( A,B,C ) ) AS count_any )
 )
 WHERE count_any >= 5
 GROUP BY last_pageid
 ORDER BY MAX( count_page80 )

The above means:
Find user click-paths starting at pageid 50 and passing exclusively through
either pageid 80 or pages in category 9 or category 10. Find the pageid of
the last page in the path and count the number of times page 80 was
visited. Report the maximum count for each last page, and sort the output
by the latter. Restrict to paths containing at least 5 pages. Ignore pages
in the sequence with category < 0.

If this query is written in pure SQL (if possible at all), it requires
several self-joins. The interesting thing about this feature is that it
integrates SQL+Streaming+ML in one (perhaps potentially graph too).

Best Regards,

Jerry


On Tue, Mar 1, 2016 at 9:39 AM, Herman van Hövell tot Westerflier <
hvanhov...@questtec.nl> wrote:

> Hi Jerry,
>
> This is not on any roadmap. I (shortly) browsed through this; and this
> looks like some sort of a window function with very awkward syntax. I think
> spark provided better constructs for this using dataframes/datasets/nested
> data...
>
> Feel free to submit a PR.
>
> Kind regards,
>
> Herman van Hövell
>
> 2016-03-01 15:16 GMT+01:00 Jerry Lam :
>
>> Hi Spark developers,
>>
>> Will you consider to add support for implementing "Pattern matching in
>> sequences of rows"? More specifically, I'm referring to this:
>> http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf
>>
>> This is a very cool/useful feature to pattern matching over live
>> stream/archived data. It is sorted of related to machine learning because
>> this is usually used in clickstream analysis or path analysis. Also it is
>> related to streaming because of the nature of the processing (time series
>> data mostly). It is SQL because there is a good way to express and optimize
>> the query.
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: [Proposal] Enabling time series analysis on spark metrics

2016-03-01 Thread Karan Kumar
+dev mailing list

Time series analysis on metrics becomes quite useful when running spark
jobs using a workflow manager like oozie.

Would love to take this up if the community thinks its worthwhile.

On Tue, Feb 23, 2016 at 2:59 PM, Karan Kumar 
wrote:

> HI
>
> Spark at the moment uses application ID to report metrics. I was thinking
> that if we can create an option to export metrics on a user-controlled key.
> This will allow us to do time series analysis on counters by dumping these
> counters in a DB such as graphite.
>
> One of the approaches I had in mind was allowing a user to set a property
> via the spark client. If that property is set, use the property value to
> report metrics else use the current implementation
> of
> reporting metrics on appid.
>
> Thoughts?
>
> --
> Thanks
> Karan
>



-- 
Thanks
Karan


Re: SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Herman van Hövell tot Westerflier
Hi Jerry,

This is not on any roadmap. I (shortly) browsed through this; and this
looks like some sort of a window function with very awkward syntax. I think
spark provided better constructs for this using dataframes/datasets/nested
data...

Feel free to submit a PR.

Kind regards,

Herman van Hövell

2016-03-01 15:16 GMT+01:00 Jerry Lam :

> Hi Spark developers,
>
> Will you consider to add support for implementing "Pattern matching in
> sequences of rows"? More specifically, I'm referring to this:
> http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf
>
> This is a very cool/useful feature to pattern matching over live
> stream/archived data. It is sorted of related to machine learning because
> this is usually used in clickstream analysis or path analysis. Also it is
> related to streaming because of the nature of the processing (time series
> data mostly). It is SQL because there is a good way to express and optimize
> the query.
>
> Best Regards,
>
> Jerry
>


SPARK-SQL: Pattern Detection on Live Event or Archived Event Data

2016-03-01 Thread Jerry Lam
Hi Spark developers,

Will you consider to add support for implementing "Pattern matching in
sequences of rows"? More specifically, I'm referring to this:
http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf

This is a very cool/useful feature to pattern matching over live
stream/archived data. It is sorted of related to machine learning because
this is usually used in clickstream analysis or path analysis. Also it is
related to streaming because of the nature of the processing (time series
data mostly). It is SQL because there is a good way to express and optimize
the query.

Best Regards,

Jerry


Re: What should be spark.local.dir in spark on yarn?

2016-03-01 Thread Jeff Zhang
You are using yarn-client mode, the driver is not yarn container, so it can
not use yarn.nodemanager.local-dirs, only have to use spark.local.dir which
/tmp by default. But usually driver won't cost too much disk, so it should
be fine to use /tmp in driver side.

On Tue, Mar 1, 2016 at 4:57 PM, Alexander Pivovarov 
wrote:

> spark 1.6.0 uses /tmp in the following places
> # spark.local.dir is not set
> yarn.nodemanager.local-dirs=/data01/yarn/nm,/data02/yarn/nm
>
> 1. spark-shell on start
> 16/03/01 08:33:48 INFO storage.DiskBlockManager: Created local directory
> at /tmp/blockmgr-ffd3143d-b47f-4844-99fd-2d51c6a05d05
>
> 2. spark-shell on start
> 16/03/01 08:33:50 INFO yarn.Client: Uploading resource
> file:/tmp/spark-456184c9-d59f-48f4-a9b0-560b7d310655/__spark_conf__6943938018805427428.zip
> ->
> hdfs://ip-10-101-124-30:8020/user/hadoop/.sparkStaging/application_1456776184284_0047/__spark_conf__6943938018805427428.zip
>
> 3. spark-shell spark-sql (Hive) on start
> 16/03/01 08:34:06 INFO session.SessionState: Created local directory:
> /tmp/01705299-a384-4e85-923b-e858017cf351_resources
> 16/03/01 08:34:06 INFO session.SessionState: Created HDFS directory:
> /tmp/hive/hadoop/01705299-a384-4e85-923b-e858017cf351
> 16/03/01 08:34:06 INFO session.SessionState: Created local directory:
> /tmp/hadoop/01705299-a384-4e85-923b-e858017cf351
> 16/03/01 08:34:06 INFO session.SessionState: Created HDFS directory:
> /tmp/hive/hadoop/01705299-a384-4e85-923b-e858017cf351/_tmp_space.db
>
> 4. Spark Executor container uses hadoop.tmp.dir /data01/tmp/hadoop-${
> user.name} for s3 output
>
> scala> sc.parallelize(1 to
> 10).saveAsTextFile("s3n://my_bucket/test/p10_13");
>
> 16/03/01 08:41:13 INFO s3native.NativeS3FileSystem: OutputStream for key 
> 'test/p10_13/part-0' writing to tempfile 
> '/data01/tmp/hadoop-hadoop/s3/output-7399167152756918334.tmp'
>
>
> --
>
> if I set spark.local.dir=/data01/tmp then #1 and #2 uses /data01/tmp instead 
> of /tmp
>
> --
>
>
> 1. 16/03/01 08:47:03 INFO storage.DiskBlockManager: Created local directory 
> at /data01/tmp/blockmgr-db88dbd2-0ef4-433a-95ea-b33392bbfb7f
>
>
> 2. 16/03/01 08:47:05 INFO yarn.Client: Uploading resource 
> file:/data01/tmp/spark-aa3e619c-a368-4f95-bd41-8448a78ae456/__spark_conf__368426817234224667.zip
>  -> 
> hdfs://ip-10-101-124-30:8020/user/hadoop/.sparkStaging/application_1456776184284_0050/__spark_conf__368426817234224667.zip
>
>
> 3. spark-sql (hive) still uses /tmp
>
> 16/03/01 08:47:20 INFO session.SessionState: Created local directory: 
> /tmp/d315926f-39d7-4dcb-b3fa-60e9976f7197_resources
> 16/03/01 08:47:20 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197
> 16/03/01 08:47:20 INFO session.SessionState: Created local directory: 
> /tmp/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197
> 16/03/01 08:47:20 INFO session.SessionState: Created HDFS directory: 
> /tmp/hive/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197/_tmp_space.db
>
>
> 4. executor uses hadoop.tmp.dir for s3 output
>
> 16/03/01 08:50:01 INFO s3native.NativeS3FileSystem: OutputStream for key 
> 'test/p10_16/_SUCCESS' writing to tempfile 
> '/data01/tmp/hadoop-hadoop/s3/output-2541604454681305094.tmp'
>
>
> 5. /data0X/yarn/nm used for usercache
>
> 16/03/01 08:41:12 INFO storage.DiskBlockManager: Created local directory at 
> /data01/yarn/nm/usercache/hadoop/appcache/application_1456776184284_0047/blockmgr-af5
>
>
>
> On Mon, Feb 29, 2016 at 3:44 PM, Jeff Zhang  wrote:
>
>> In yarn mode, spark.local.dir is yarn.nodemanager.local-dirs for shuffle
>> data and block manager disk data. What do you mean "But output files to
>> upload to s3 still created in /tmp on slaves" ? You should have control on
>> where to store your output data if that means your job's output.
>>
>> On Tue, Mar 1, 2016 at 3:12 AM, Alexander Pivovarov > > wrote:
>>
>>> I have Spark on yarn
>>>
>>> I defined yarn.nodemanager.local-dirs to be
>>> /data01/yarn/nm,/data02/yarn/nm
>>>
>>> when I look at yarn executor container log I see that blockmanager files
>>> created in /data01/yarn/nm,/data02/yarn/nm
>>>
>>> But output files to upload to s3 still created in /tmp on slaves
>>>
>>> I do not want Spark write heavy files to /tmp because /tmp is only 5GB
>>>
>>> spark slaves have two big additional disks /disk01 and /disk02 attached
>>>
>>> Probably I can set spark.local.dir to be /data01/tmp,/data02/tmp
>>>
>>> But spark master also writes some files to spark.local.dir
>>> But my master box has only one additional disk /data01
>>>
>>> So, what should I use for  spark.local.dir the
>>> spark.local.dir=/data01/tmp
>>> or
>>> spark.local.dir=/data01/tmp,/data02/tmp
>>>
>>> ?
>>>
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>


-- 
Best Regards

Jeff Zhang


Re: Is spark.driver.maxResultSize used correctly ?

2016-03-01 Thread Jeff Zhang
Check the code again. Looks like currently the task result will be loaded
into memory no matter it is DirectTaskResult or InDirectTaskResult.
Previous I thought InDirectTaskResult can be loaded into memory later which
can save memory, RDD#collectAsIterator is what I thought that may save
memory.

On Tue, Mar 1, 2016 at 5:00 PM, Reynold Xin  wrote:

> How big of a deal is this though? If I am reading your email correctly,
> either way this job will fail. You simply want it to fail earlier in the
> executor side, rather than collecting it and fail on the driver side?
>
>
> On Sunday, February 28, 2016, Jeff Zhang  wrote:
>
>> data skew might be possible, but not the common case. I think we should
>> design for the common case, for the skew case, we may can set some
>> parameter of fraction to allow user to tune it.
>>
>> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin  wrote:
>>
>>> But sometimes you might have skew and almost all the result data are in
>>> one or a few tasks though.
>>>
>>>
>>> On Friday, February 26, 2016, Jeff Zhang  wrote:
>>>

 My job get this exception very easily even when I set large value of
 spark.driver.maxResultSize. After checking the spark code, I found
 spark.driver.maxResultSize is also used in Executor side to decide whether
 DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
 Using  spark.driver.maxResultSize / taskNum might be more proper. Because
 if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
 output. Then even the output of each task is less than
  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
 the total result size is 2g which will cause exception in driver side.


 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
 LogisticRegression.scala:283, took 33.796379 s

 Exception in thread "main" org.apache.spark.SparkException: Job aborted
 due to stage failure: Total size of serialized results of 1 tasks (1085.0
 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)


 --
 Best Regards

 Jeff Zhang

>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang


Re: Is spark.driver.maxResultSize used correctly ?

2016-03-01 Thread Reynold Xin
How big of a deal is this though? If I am reading your email correctly,
either way this job will fail. You simply want it to fail earlier in the
executor side, rather than collecting it and fail on the driver side?

On Sunday, February 28, 2016, Jeff Zhang  wrote:

> data skew might be possible, but not the common case. I think we should
> design for the common case, for the skew case, we may can set some
> parameter of fraction to allow user to tune it.
>
> On Sat, Feb 27, 2016 at 4:51 PM, Reynold Xin  > wrote:
>
>> But sometimes you might have skew and almost all the result data are in
>> one or a few tasks though.
>>
>>
>> On Friday, February 26, 2016, Jeff Zhang > > wrote:
>>
>>>
>>> My job get this exception very easily even when I set large value of
>>> spark.driver.maxResultSize. After checking the spark code, I found
>>> spark.driver.maxResultSize is also used in Executor side to decide whether
>>> DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
>>> Using  spark.driver.maxResultSize / taskNum might be more proper. Because
>>> if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
>>> output. Then even the output of each task is less than
>>>  spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
>>> the total result size is 2g which will cause exception in driver side.
>>>
>>>
>>> 16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
>>> LogisticRegression.scala:283, took 33.796379 s
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Total size of serialized results of 1 tasks (1085.0
>>> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: What should be spark.local.dir in spark on yarn?

2016-03-01 Thread Alexander Pivovarov
spark 1.6.0 uses /tmp in the following places
# spark.local.dir is not set
yarn.nodemanager.local-dirs=/data01/yarn/nm,/data02/yarn/nm

1. spark-shell on start
16/03/01 08:33:48 INFO storage.DiskBlockManager: Created local directory at
/tmp/blockmgr-ffd3143d-b47f-4844-99fd-2d51c6a05d05

2. spark-shell on start
16/03/01 08:33:50 INFO yarn.Client: Uploading resource
file:/tmp/spark-456184c9-d59f-48f4-a9b0-560b7d310655/__spark_conf__6943938018805427428.zip
->
hdfs://ip-10-101-124-30:8020/user/hadoop/.sparkStaging/application_1456776184284_0047/__spark_conf__6943938018805427428.zip

3. spark-shell spark-sql (Hive) on start
16/03/01 08:34:06 INFO session.SessionState: Created local directory:
/tmp/01705299-a384-4e85-923b-e858017cf351_resources
16/03/01 08:34:06 INFO session.SessionState: Created HDFS directory:
/tmp/hive/hadoop/01705299-a384-4e85-923b-e858017cf351
16/03/01 08:34:06 INFO session.SessionState: Created local directory:
/tmp/hadoop/01705299-a384-4e85-923b-e858017cf351
16/03/01 08:34:06 INFO session.SessionState: Created HDFS directory:
/tmp/hive/hadoop/01705299-a384-4e85-923b-e858017cf351/_tmp_space.db

4. Spark Executor container uses hadoop.tmp.dir /data01/tmp/hadoop-${
user.name} for s3 output

scala> sc.parallelize(1 to
10).saveAsTextFile("s3n://my_bucket/test/p10_13");

16/03/01 08:41:13 INFO s3native.NativeS3FileSystem: OutputStream for
key 'test/p10_13/part-0' writing to tempfile
'/data01/tmp/hadoop-hadoop/s3/output-7399167152756918334.tmp'


--

if I set spark.local.dir=/data01/tmp then #1 and #2 uses /data01/tmp
instead of /tmp

--


1. 16/03/01 08:47:03 INFO storage.DiskBlockManager: Created local
directory at /data01/tmp/blockmgr-db88dbd2-0ef4-433a-95ea-b33392bbfb7f


2. 16/03/01 08:47:05 INFO yarn.Client: Uploading resource
file:/data01/tmp/spark-aa3e619c-a368-4f95-bd41-8448a78ae456/__spark_conf__368426817234224667.zip
-> 
hdfs://ip-10-101-124-30:8020/user/hadoop/.sparkStaging/application_1456776184284_0050/__spark_conf__368426817234224667.zip


3. spark-sql (hive) still uses /tmp

16/03/01 08:47:20 INFO session.SessionState: Created local directory:
/tmp/d315926f-39d7-4dcb-b3fa-60e9976f7197_resources
16/03/01 08:47:20 INFO session.SessionState: Created HDFS directory:
/tmp/hive/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197
16/03/01 08:47:20 INFO session.SessionState: Created local directory:
/tmp/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197
16/03/01 08:47:20 INFO session.SessionState: Created HDFS directory:
/tmp/hive/hadoop/d315926f-39d7-4dcb-b3fa-60e9976f7197/_tmp_space.db


4. executor uses hadoop.tmp.dir for s3 output

16/03/01 08:50:01 INFO s3native.NativeS3FileSystem: OutputStream for
key 'test/p10_16/_SUCCESS' writing to tempfile
'/data01/tmp/hadoop-hadoop/s3/output-2541604454681305094.tmp'


5. /data0X/yarn/nm used for usercache

16/03/01 08:41:12 INFO storage.DiskBlockManager: Created local
directory at 
/data01/yarn/nm/usercache/hadoop/appcache/application_1456776184284_0047/blockmgr-af5



On Mon, Feb 29, 2016 at 3:44 PM, Jeff Zhang  wrote:

> In yarn mode, spark.local.dir is yarn.nodemanager.local-dirs for shuffle
> data and block manager disk data. What do you mean "But output files to
> upload to s3 still created in /tmp on slaves" ? You should have control on
> where to store your output data if that means your job's output.
>
> On Tue, Mar 1, 2016 at 3:12 AM, Alexander Pivovarov 
> wrote:
>
>> I have Spark on yarn
>>
>> I defined yarn.nodemanager.local-dirs to be
>> /data01/yarn/nm,/data02/yarn/nm
>>
>> when I look at yarn executor container log I see that blockmanager files
>> created in /data01/yarn/nm,/data02/yarn/nm
>>
>> But output files to upload to s3 still created in /tmp on slaves
>>
>> I do not want Spark write heavy files to /tmp because /tmp is only 5GB
>>
>> spark slaves have two big additional disks /disk01 and /disk02 attached
>>
>> Probably I can set spark.local.dir to be /data01/tmp,/data02/tmp
>>
>> But spark master also writes some files to spark.local.dir
>> But my master box has only one additional disk /data01
>>
>> So, what should I use for  spark.local.dir the
>> spark.local.dir=/data01/tmp
>> or
>> spark.local.dir=/data01/tmp,/data02/tmp
>>
>> ?
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Support virtualenv in PySpark

2016-03-01 Thread Jeff Zhang
I may not express it clearly. This method is trying to create virtualenv
before python worker start, and this virtualenv is application scope, after
the spark application job finish, the virtualenv will be cleanup. And the
virtualenvs don't need to be the same path for each node (In my POC, it is
the yarn container working directory). So that means user don't need to
manually install packages on each node (sometimes you even can't install
packages on cluster due to security reason). This is the biggest benefit
and purpose that user can create virtualenv on demand without touching each
node even when you are not administrator.  The cons is the extra cost for
installing the required packages before starting python worker. But if it
is an application which will run for several hours then the extra cost can
be ignored.

On Tue, Mar 1, 2016 at 4:15 PM, Mohannad Ali  wrote:

> Hello Jeff,
>
> Well this would also mean that you have to manage the same virtualenv
> (same path) on all nodes and install your packages to it the same way you
> would if you would install the packages to the default python path.
>
> In any case at the moment you can already do what you proposed by creating
> identical virtualenvs on all nodes on the same path and change the spark
> python path to point to the virtualenv.
>
> Best Regards,
> Mohannad
> On Mar 1, 2016 06:07, "Jeff Zhang"  wrote:
>
>> I have created jira for this feature , comments and feedback are welcome
>> about how to improve it and whether it's valuable for users.
>>
>> https://issues.apache.org/jira/browse/SPARK-13587
>>
>>
>> Here's some background info and status of this work.
>>
>>
>> Currently, it's not easy for user to add third party python packages in
>> pyspark.
>>
>>- One way is to using --py-files (suitable for simple dependency, but
>>not suitable for complicated dependency, especially with transitive
>>dependency)
>>- Another way is install packages manually on each node (time
>>wasting, and not easy to switch to different environment)
>>
>> Python now has 2 different virtualenv implementation. One is native
>> virtualenv another is through conda.
>>
>> I have implemented POC for this features. Here's one simple command for
>> how to use virtualenv in pyspark
>>
>> bin/spark-submit --master yarn --deploy-mode client --conf 
>> "spark.pyspark.virtualenv.enabled=true" --conf 
>> "spark.pyspark.virtualenv.type=conda" --conf 
>> "spark.pyspark.virtualenv.requirements=/Users/jzhang/work/virtualenv/conda.txt"
>>  --conf "spark.pyspark.virtualenv.path=/Users/jzhang/anaconda/bin/conda"  
>> ~/work/virtualenv/spark.py
>>
>> There're 4 properties needs to be set
>>
>>- spark.pyspark.virtualenv.enabled (enable virtualenv)
>>- spark.pyspark.virtualenv.type (native/conda are supported, default
>>is native)
>>- spark.pyspark.virtualenv.requirements (requirement file for the
>>dependencies)
>>- spark.pyspark.virtualenv.path (path to the executable for for
>>virtualenv/conda)
>>
>>
>>
>>
>>
>>
>> Best Regards
>>
>> Jeff Zhang
>>
>


-- 
Best Regards

Jeff Zhang