RE: how does isDistinct work on expressions

2016-11-13 Thread assaf.mendelson
Thanks for the pointer. It makes more sense now.
Assaf.

From: Herman van Hövell tot Westerflier-2 [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19842...@n3.nabble.com]
Sent: Sunday, November 13, 2016 10:03 PM
To: Mendelson, Assaf
Subject: Re: how does isDistinct work on expressions

Hi,

You should take a look at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala

Spark SQL does not directly support the aggregation of multiple distinct 
groups. For example select count(distinct a), count(distinct b) from tbl_x 
containts distinct groups a  & b. The RewriteDistinctAggregates rewrites this 
into an two aggregates, the first aggregate takes care of deduplication and the 
second aggregate does the actual aggregation.

HTH

On Sun, Nov 13, 2016 at 11:46 AM, Jacek Laskowski <[hidden 
email]> wrote:
Hi,

I might not have been there yet, but since I'm with the code every day
I might be close...

When you say "aggregate functions", are you about typed or untyped
ones? Just today I reviewed the typed ones and honestly took me some
time to figure out what belongs to where. Are you creating a new UDAF?
What have you done already? GitHub perhaps?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Nov 13, 2016 at 12:03 PM, assaf.mendelson
<[hidden email]> wrote:
> Hi,
>
> I am trying to understand how aggregate functions are implemented
> internally.
>
> I see that the expression is wrapped using toAggregateExpression using
> isDistinct.
>
> I can’t figure out where the code that makes the data distinct is located. I
> am trying to figure out how the input data is converted into a distinct
> version.
>
> Thanks,
>
> Assaf.
>
>
> 
> View this message in context: how does isDistinct work on expressions
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
-
To unsubscribe e-mail: [hidden 
email]



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836p19842.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836p19847.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: statistics collection and propagation for cost-based optimizer

2016-11-13 Thread Reynold Xin
One additional note: in terms of size, the size of a count-min sketch with
eps = 0.1% and confidence 0.87, uncompressed, is 48k bytes.

To look up what that means, see
http://spark.apache.org/docs/latest/api/java/org/apache/spark/util/sketch/CountMinSketch.html





On Sun, Nov 13, 2016 at 5:30 PM, Reynold Xin  wrote:

> I want to bring this discussion to the dev list to gather broader
> feedback, as there have been some discussions that happened over multiple
> JIRA tickets (SPARK-16026
> , etc) and GitHub pull
> requests about what statistics to collect and how to use them.
>
> There are some basic statistics on columns that are obvious to use and we
> don't need to debate these: estimated size (in bytes), row count, min, max,
> number of nulls, number of distinct values, average column length, max
> column length.
>
> In addition, we want to be able to estimate selectivity for equality and
> range predicates better, especially taking into account skewed values and
> outliers.
>
> Before I dive into the different options, let me first explain count-min
> sketch: Count-min sketch is a common sketch algorithm that tracks frequency
> counts. It has the following nice properties:
> - sublinear space
> - can be generated in one-pass in a streaming fashion
> - can be incrementally maintained (i.e. for appending new data)
> - it's already implemented in Spark
> - more accurate for frequent values, and less accurate for less-frequent
> values, i.e. it tracks skewed values well.
> - easy to compute inner product, i.e. trivial to compute the count-min
> sketch of a join given two count-min sketches of the join tables
>
>
> Proposal 1 is is to use a combination of count-min sketch and equi-height
> histograms. In this case, count-min sketch will be used for selectivity
> estimation on equality predicates, and histogram will be used on range
> predicates.
>
> Proposal 2 is to just use count-min sketch on equality predicates, and
> then simple selected_range / (max - min) will be used for range predicates.
> This will be less accurate than using histogram, but simpler because we
> don't need to collect histograms.
>
> Proposal 3 is a variant of proposal 2, and takes into account that skewed
> values can impact selectivity heavily. In 3, we track the list of heavy
> hitters (HH, most frequent items) along with count-min sketch on the
> column. Then:
> - use count-min sketch on equality predicates
> - for range predicates, estimatedFreq =  sum(freq(HHInRange)) + range /
> (max - min)
>
> Proposal 4 is to not use any sketch, and use histogram for high
> cardinality columns, and exact (value, frequency) pairs for low cardinality
> columns (e.g. num distinct value <= 255).
>
> Proposal 5 is a variant of proposal 4, and adapts it to track exact
> (value, frequency) pairs for the most frequent values only, so we can still
> have that for high cardinality columns. This is actually very similar to
> count-min sketch, but might use less space, although requiring two passes
> to compute the initial value, and more difficult to compute the inner
> product for joins.
>
>
>
>


statistics collection and propagation for cost-based optimizer

2016-11-13 Thread Reynold Xin
I want to bring this discussion to the dev list to gather broader feedback,
as there have been some discussions that happened over multiple JIRA
tickets (SPARK-16026 ,
etc) and GitHub pull requests about what statistics to collect and how to
use them.

There are some basic statistics on columns that are obvious to use and we
don't need to debate these: estimated size (in bytes), row count, min, max,
number of nulls, number of distinct values, average column length, max
column length.

In addition, we want to be able to estimate selectivity for equality and
range predicates better, especially taking into account skewed values and
outliers.

Before I dive into the different options, let me first explain count-min
sketch: Count-min sketch is a common sketch algorithm that tracks frequency
counts. It has the following nice properties:
- sublinear space
- can be generated in one-pass in a streaming fashion
- can be incrementally maintained (i.e. for appending new data)
- it's already implemented in Spark
- more accurate for frequent values, and less accurate for less-frequent
values, i.e. it tracks skewed values well.
- easy to compute inner product, i.e. trivial to compute the count-min
sketch of a join given two count-min sketches of the join tables


Proposal 1 is is to use a combination of count-min sketch and equi-height
histograms. In this case, count-min sketch will be used for selectivity
estimation on equality predicates, and histogram will be used on range
predicates.

Proposal 2 is to just use count-min sketch on equality predicates, and then
simple selected_range / (max - min) will be used for range predicates. This
will be less accurate than using histogram, but simpler because we don't
need to collect histograms.

Proposal 3 is a variant of proposal 2, and takes into account that skewed
values can impact selectivity heavily. In 3, we track the list of heavy
hitters (HH, most frequent items) along with count-min sketch on the
column. Then:
- use count-min sketch on equality predicates
- for range predicates, estimatedFreq =  sum(freq(HHInRange)) + range /
(max - min)

Proposal 4 is to not use any sketch, and use histogram for high cardinality
columns, and exact (value, frequency) pairs for low cardinality columns
(e.g. num distinct value <= 255).

Proposal 5 is a variant of proposal 4, and adapts it to track exact (value,
frequency) pairs for the most frequent values only, so we can still have
that for high cardinality columns. This is actually very similar to
count-min sketch, but might use less space, although requiring two passes
to compute the initial value, and more difficult to compute the inner
product for joins.


Re: Component naming in the PR title

2016-11-13 Thread Hyukjin Kwon
I see. I was just curious as I find myself hesitating when I open a PR time
to time.

Thank you both for echoing!

On 14 Nov 2016 5:02 a.m., "Sean Owen"  wrote:

> Yes they really correspond to, if anything, the categories at
> spark-prs.appspot.com . They aren't that consistently used however and
> there isn't really a definite list. It is really mostly of use for the fact
> that it tags emails in a way people can filter semi-effectively. So I think
> we have left it at an informal mechanism rather than make yet another list
> of categories to maintain.
>
> On Sat, Nov 12, 2016, 18:27 Hyukjin Kwon  wrote:
>
>> Hi all,
>>
>>
>> First of all, this might be minor but I just have been curious of
>> different PR titles in particular component part. So, I looked through
>> Spark wiki again and I found the description not quite the same with the
>> PRs.
>>
>> It seems, it is said,
>> Pull Request
>>
>> ...
>>
>>1.
>>
>>   The PR title should be of the form [SPARK-] [COMPONENT] Title,
>>   where SPARK- is the relevant JIRA number, COMPONENT is one of the 
>> PR
>>   categories shown at https://spark-prs.appspot.com/ and Title may
>>   be the JIRA's title or a more specific title describing the PR itself.
>>
>>
>> ...
>>
>> So, It seems the component should be one of
>>
>> SQL, MLlib, Core, Python, Scheduler, Build, Docs, Streaming, Mesos, Web
>> UI, YARN, GraphX, R.
>>
>> If the component refers the ones specified in the JIRA, then, it seems it
>> should be one of
>>
>> Block Manager, Build, Deploy, Documentation, DStream, EC2, Examples,
>> GraphX, Input/Output, Java API, Mesos, ML, MLlib, Optimizer, Project Infra,
>> PySpark, Scheduler, Shuffle, Spark Core, Spark Shell, Spark Submit, SparkR,
>> SQL, Structured Streaming, Tests, Web UI, Windows, YARN
>>
>> It seems they are a bit different with the PRs. I hope this is clarified
>> in more details (including whether it should be all upper-cased or just the
>> same with the component name maybe).
>>
>>
>> Thanks.
>>
>


Re: does The Design of spark consider the scala parallelize collections?

2016-11-13 Thread Reynold Xin
Some places in Spark do use it:

> git grep "\\.par\\."
mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala:
 val models = Range(0, numClasses).par.map { index =>
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala:
   (0 until 10).par.foreach { _ =>
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala:
 (1 to 100).par.foreach { _ =>
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala:
   (1 to 100).par.map { i =>
streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala:
 inputStreams.par.foreach(_.start())
streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala:
 inputStreams.par.foreach(_.stop())


Most of the usage are in tests, not the actual execution path. Parallel
collection is fairly complicated and difficult to manage (implicit thread
pools). It is good for more the basic thread management, but Spark itself
has much more sophisticated parallelization built-in.


On Sat, Nov 12, 2016 at 5:57 AM, WangJianfei <
wangjianfe...@otcaix.iscas.ac.cn> wrote:

> Hi devs:
>According to scala doc, we can see the scala has parallelize
> collections,
> according to my experient, surely, parallelize collections can accelerate
> the operation,such as(map). so i want to know does spark has used the scala
> parallelize collections and even will spark consider thant? thank you!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/does-The-Design-of-
> spark-consider-the-scala-parallelize-collections-tp19833.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Component naming in the PR title

2016-11-13 Thread Sean Owen
Yes they really correspond to, if anything, the categories at
spark-prs.appspot.com . They aren't that consistently used however and
there isn't really a definite list. It is really mostly of use for the fact
that it tags emails in a way people can filter semi-effectively. So I think
we have left it at an informal mechanism rather than make yet another list
of categories to maintain.

On Sat, Nov 12, 2016, 18:27 Hyukjin Kwon  wrote:

> Hi all,
>
>
> First of all, this might be minor but I just have been curious of
> different PR titles in particular component part. So, I looked through
> Spark wiki again and I found the description not quite the same with the
> PRs.
>
> It seems, it is said,
> Pull Request
>
> ...
>
>1.
>
>   The PR title should be of the form [SPARK-] [COMPONENT] Title,
>   where SPARK- is the relevant JIRA number, COMPONENT is one of the PR
>   categories shown at https://spark-prs.appspot.com/ and Title may be
>   the JIRA's title or a more specific title describing the PR itself.
>
>
> ...
>
> So, It seems the component should be one of
>
> SQL, MLlib, Core, Python, Scheduler, Build, Docs, Streaming, Mesos, Web
> UI, YARN, GraphX, R.
>
> If the component refers the ones specified in the JIRA, then, it seems it
> should be one of
>
> Block Manager, Build, Deploy, Documentation, DStream, EC2, Examples,
> GraphX, Input/Output, Java API, Mesos, ML, MLlib, Optimizer, Project Infra,
> PySpark, Scheduler, Shuffle, Spark Core, Spark Shell, Spark Submit, SparkR,
> SQL, Structured Streaming, Tests, Web UI, Windows, YARN
>
> It seems they are a bit different with the PRs. I hope this is clarified
> in more details (including whether it should be all upper-cased or just the
> same with the component name maybe).
>
>
> Thanks.
>


On the use of catalyst.dsl package and deserialize vs CatalystSerde.deserialize

2016-11-13 Thread Jacek Laskowski
Hi,

It's just a (minor?) example of how to use catalyst.dsl package [1],
but am currently reviewing deserialize [2] and got a question.

CatalystSerde.deserialize [3] is exactly the deserialize operator
(referred above) and since CatalystSerde.deserialize's used in few
places like Dataset.rdd [4] as follows:

val deserialized = CatalystSerde.deserialize[T](logicalPlan)

I'm wondering why the deserialize dsl operator is not used instead
that would make the line as follows:

val deserialized = deserialize(logicalPlan)

which looks so much nicer to my eyes.

Any reason for using CatalystSerde.deserialize[T](logicalPlan) instead
of this seemingly simpler deserialize operator? Is this because it's
just a matter of find-and-replace and no one had time for this?

Please help me understand this area better. Thanks!

[1] 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
[2] 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala#L304
[3] 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala#L32
[4] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2498

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Converting spark types and standard scala types

2016-11-13 Thread assaf.mendelson
Hi,
I am trying to write a new aggregate function 
(https://issues.apache.org/jira/browse/SPARK-17691) and I wanted it to support 
all ordered types.
I have several  issues though:

1.   How to convert the type of the child expression to a Scala standard 
type (e.g. I need an Array[Int] for IntegerType and an Array[Double] for 
DoubleType). The only method I found so far is to do a match for each of the 
types. Is there a better way?

2.   What would be the corresponding scala type for DecimalType, 
TimestampType, DateType and BinaryType? I also couldn't figure out how to do a 
case for DecimalType. Do I need to do a specific case for each of its internal 
types?

3.   Should BinaryType be a legal type for such a function?

4.   I need to serialize the relevant array of type (i.e. turn it into an 
Array[Byte] for working with TypedImperativeAggregate). Currently, I use 
java.io.{ByteArrayOutputStream, ByteArrayInputStream, ObjectInputStream, 
ObjectOutputStream}. Is there another way which is more standard (e.g. get a 
"Serialize" function which knows what to use:  java serialization, kyro 
serialization etc. based on spark configuration?)
Thanks,
Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-spark-types-and-standard-scala-types-tp19837.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

how does isDistinct work on expressions

2016-11-13 Thread assaf.mendelson
Hi,
I am trying to understand how aggregate functions are implemented internally.
I see that the expression is wrapped using toAggregateExpression using 
isDistinct.
I can't figure out where the code that makes the data distinct is located. I am 
trying to figure out how the input data is converted into a distinct version.
Thanks,
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.