RE: Shuffle intermidiate results not being cached

2016-12-27 Thread assaf.mendelson
I understand the actual dataframe is different, but the underlying partitions 
are not (hence the importance of mark's response). The code you suggested would 
not work as allDF and x would have different schema's (x is the original and 
allDF becomes the grouped).
I can do something like this:
  var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
x <- dataframes
  } {
val timeLen = time {
  val grouped = x.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
  allDF = if (allDF == null) grouped else {
allDF.union(grouped).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
  }
  val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
  grouped2.show()
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
  }
  println(s"Overall time was $totalTime miliseconds")
}

and this indeed improves performance (I actually had a couple more tries) but:

1.   This still gives crappy performance (for 167 slices I get a throughput 
which is 10 times lower than batch after doing some tuning including caching 
and coalescing)

2.   This works because the aggregation here is sum and we don't forget. 
For more general aggregations we would have to join them together (can't do it 
for count distinct for example) and we will need to "forget" frames when moving 
out of the window (we can subtract a sum but not a max).

The best solution I found so far (performance wise) was to write a custom UDAF 
which does the window internally. This was still 8 times lower throughput than 
batch and required a lot of coding and is not a general solution.

I am looking for an approach to improve the performance even more (preferably 
to either be on par with batch or a relatively low factor which remains 
constant when the number of slices rise) and including the option to "forget" 
frames.

Assaf.




From: Liang-Chi Hsieh [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20371...@n3.nabble.com]
Sent: Wednesday, December 28, 2016 3:59 AM
To: Mendelson, Assaf
Subject: RE: Shuffle intermidiate results not being cached


Hi,

Every iteration the data you run aggregation on it is different. As I showed in 
previous reply:

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))

In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1, x2 
and x3. Even you work on the same RDD, you won't see reuse of the shuffle data 
because the shuffle data is different.

In your second example, I think the way to reduce the computation is like:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x) // Union previous 
aggregation summary with new dataframe in this window
val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
allDF = grouped  // Replace the union of data with aggregated summary
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")

You don't need to recompute the aggregation of previous dataframes in each 
iteration. You just need to get the summary and union it with new dataframe to 
compute the newer aggregation summary in next iteration. It is more similar to 
streaming case, I don't think you can/should recompute all the data since the 
beginning of a stream.

assaf.mendelson wrote
The reason I thought some operations would be reused is the fact that spark 
automatically caches shuffle data which means the partial aggregation for 
pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained 
this is not the case because this is considered a new RDD and therefore the 
previous data is lost.

I am still wondering if there is any way to do high performance streaming of 
SQL. Basically this is not far from what DStream would do assuming we convert a 
sliding window (e.g. 24 hours every 15 minutes) as we would be doing a 
foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in 
performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using 
dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden 
email]]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x)
val grouped = allDF.groupBy("cat1", 

Re: What is mainly different from a UDT and a spark internal type that ExpressionEncoder recognized?

2016-12-27 Thread dragonly
Thanks for your reply!

Here's my *understanding*:
basic types that ScalaReflection understands are encoded into tungsten
binary format, while UDTs are encoded into GenericInternalRow, which stores
the JVM objects in an Array[Any] under the hood, and thus lose those memory
footprint efficiency and cpu cache efficiency stuff provided by tungsten
encoding.

If the above is correct, then here are my *further questions*:
Are SparkPlan nodes (those ends with Exec) all codegenerated before actually
running the toRdd logic? I know there are some non-codegenable nodes which
implement trait CodegenFallback, but there's also a doGenCode method in the
trait, so the actual calling convention really puzzles me. And I've tried to
trace those calling flow for a few days but found them scattered every
where. I cannot make a big graph of the method calling order even with the
help of IntelliJ.

Let me rephrase this. How does the SparkSQL engine call the codegen APIs to
do the job of producing RDDs? What are those eval methods in Expressions for
given there's already a doGenCode next to it?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/What-is-mainly-different-from-a-UDT-and-a-spark-internal-type-that-ExpressionEncoder-recognized-tp20370p20376.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: unsubscribe

2016-12-27 Thread Minikek
Once you are in, there is no way out… :-)

> On Dec 27, 2016, at 7:37 PM, Kyle Kelley  wrote:
> 
> You are now in position 238 for unsubscription. If you wish for your
> subscription to occur immediately, please email
> dev-unsubscr...@spark.apache.org
> 
> Best wishes.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: What is mainly different from a UDT and a spark internal type that ExpressionEncoder recognized?

2016-12-27 Thread Michael Armbrust
An encoder uses reflection

to generate expressions that can extract data out of an object (by calling
methods on the object) and encode its contents directly into the tungsten
binary row format (and vice versa).  We codegenerate bytecode that
evaluates these expression in the same way that we code generate code for
normal expression evaluation in query processing.  However, this reflection
only works for simple ATDs
.  Another key thing to
realize is that we do this reflection / code generation at runtime, so we
aren't constrained by binary compatibility across versions of spark.

UDTs let you write custom code that translates an object into into a
generic row, which we can then translate into Spark's internal format
(using a RowEncoder). Unlike expressions and tungsten binary encoding, the
Row type that you return here is a stable public API that hasn't changed
since Spark 1.3.

So to summarize, if encoders don't work for your specific types you can use
UDTs, but they probably won't be as efficient. I'd love to unify these code
paths more, but its actually a fair amount of work to come up with a good
stable public API that doesn't sacrifice performance.

On Tue, Dec 27, 2016 at 6:32 AM, dragonly  wrote:

> I'm recently reading the source code of the SparkSQL project, and found
> some
> interesting databricks blogs about the tungsten project. I've roughly read
> through the encoder and unsafe representation part of the tungsten
> project(haven't read the algorithm part such as cache friendly hashmap
> algorithms).
> Now there's a big puzzle in front of me about the codegen of SparkSQL and
> how does the codegen utilize the tungsten encoding between JMV objects and
> unsafe bits.
> So can anyone tell me that's the main difference in situations where I
> write
> a UDT like ExamplePointUDT in SparkSQL or just create an ArrayType which
> can
> be handled by the tungsten encoder? I'll really appreciate it if you can go
> through some concrete code examples. thanks a lot!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/What-is-mainly-
> different-from-a-UDT-and-a-spark-internal-type-that-
> ExpressionEncoder-recognized-tp20370.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


RE: Shuffle intermidiate results not being cached

2016-12-27 Thread Liang-Chi Hsieh

Hi,

Every iteration the data you run aggregation on it is different. As I showed
in previous reply:

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))

In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1,
x2 and x3. Even you work on the same RDD, you won't see reuse of the shuffle
data because the shuffle data is different.

In your second example, I think the way to reduce the computation is like:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x) // Union previous
aggregation summary with new dataframe in this window
val grouped = allDF.groupBy("cat1",
"cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
allDF = grouped  // Replace the union of data with aggregated summary
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")

You don't need to recompute the aggregation of previous dataframes in each
iteration. You just need to get the summary and union it with new dataframe
to compute the newer aggregation summary in next iteration. It is more
similar to streaming case, I don't think you can/should recompute all the
data since the beginning of a stream.



assaf.mendelson wrote
> The reason I thought some operations would be reused is the fact that
> spark automatically caches shuffle data which means the partial
> aggregation for pervious dataframes would be saved. Unfortunatly, as Mark
> Hamstra explained this is not the case because this is considered a new
> RDD and therefore the previous data is lost.
> 
> I am still wondering if there is any way to do high performance streaming
> of SQL. Basically this is not far from what DStream would do assuming we
> convert a sliding window (e.g. 24 hours every 15 minutes) as we would be
> doing a foreachRDD which would do the joining behind the scenes.
> The problem is that any attempt to do a streaming like this results in
> performance which is hundreds of times slower than batch.
> Is there a correct way to do such an aggregation on streaming data (using
> dataframes rather than RDD operations).
> Assaf.
> 
> 
> 
> From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:

> ml-node+s1001551n20361h80@.nabble

> ]
> Sent: Monday, December 26, 2016 5:42 PM
> To: Mendelson, Assaf
> Subject: Re: Shuffle intermidiate results not being cached
> 
> 
> Hi,
> 
> Let me quote your example codes:
> 
> var totalTime: Long = 0
> var allDF: org.apache.spark.sql.DataFrame = null
> for {
>   x <- dataframes
> } {
>   val timeLen = time {
> allDF = if (allDF == null) x else allDF.union(x)
> val grouped = allDF.groupBy("cat1",
> "cat2").agg(sum($"valToAdd").alias("v"))
> val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
> grouped2.show()
>   }
>   totalTime += timeLen
>   println(s"Took $timeLen miliseconds")
> }
> println(s"Total time was $totalTime miliseconds")
> 
> 
> Basically what you do is to union some dataframes for each iteration, and
> do aggregation on this union data. I don't see any reused operations.
> 
> 1st iteration: aggregation(x1 union x2)
> 2nd iteration: aggregation(x3 union (x1 union x2))
> 3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
> ...
> 
> Your first example just does two aggregation operations. But your second
> example like above does this aggregation operations for each iteration. So
> the time of second example grows as the iteration increases.
> 
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> 
> 
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
> To start a new topic under Apache Spark Developers List, email 

> ml-node+s1001551n1h20@.nabble

> mailto:

> ml-node+s1001551n1h20@.nabble

> 
> To unsubscribe from Apache Spark Developers List, click
> herehttp://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==;.
> NAMLhttp://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml;





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 

What is mainly different from a UDT and a spark internal type that ExpressionEncoder recognized?

2016-12-27 Thread dragonly
I'm recently reading the source code of the SparkSQL project, and found some
interesting databricks blogs about the tungsten project. I've roughly read
through the encoder and unsafe representation part of the tungsten
project(haven't read the algorithm part such as cache friendly hashmap
algorithms).
Now there's a big puzzle in front of me about the codegen of SparkSQL and
how does the codegen utilize the tungsten encoding between JMV objects and
unsafe bits. 
So can anyone tell me that's the main difference in situations where I write
a UDT like ExamplePointUDT in SparkSQL or just create an ArrayType which can
be handled by the tungsten encoder? I'll really appreciate it if you can go
through some concrete code examples. thanks a lot!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/What-is-mainly-different-from-a-UDT-and-a-spark-internal-type-that-ExpressionEncoder-recognized-tp20370.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org