Re: sparkSQL thread safe?

2014-07-13 Thread Reynold Xin
Ian,

The LZFOutputStream's large byte buffer is sort of annoying. It is much
smaller if you use the Snappy one. The downside of the Snappy one is
slightly less compression (I've seen 10 - 20% larger sizes).

If we can find a compression scheme implementation that doesn't do very
large buffers, that'd be a good idea too ... let me know if you have any
suggestions.

In the future, we plan to make shuffle write to less number of streams at
the same time.



On Sat, Jul 12, 2014 at 7:59 PM, Ian O'Connell i...@ianoconnell.com wrote:

 Thanks for the response Michael

 On the first i'm following the JIRA now thanks, not blocker for me but
 would be great to see.

 I opened up a PR with the resource pool usage around it. I didn't include
 it in the PR, but a few classes we should probably add as registered in
 kryo for good perf/size:
 classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
 classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
 classOf[org.apache.spark.sql.catalyst.expressions.Row],
 classOf[Array[Object]],
 scala.collection.immutable.Nil.getClass,
 scala.collection.immutable.::.getClass,
 classOf[scala.collection.immutable.::[Any]]

 Thanks for adding that distinct btw, great to have it scale more.

 On the last, opened the JIRA thanks.

 Also more of a sparkCore thing that you might already be aware of, but I
 haven't seen mentioned somewhere and was hitting me(Also if any part of
 this seems wrong to you I'd love to know):

 I was getting out of memory doing a bunch of ops against medium(~1TB
 compressed) input sizes with simple things that should spill nicely
 (distinct, reduceByKey(_ + _) ).

 Anyway what I came back with(copied from an internal email):

 I looked through some heap dumps from the OOM's in spark and found there
 were 10k instances of DiskBlockObjectWriter's each of which were up to
 300kb in size per active executor. At up to 12 concurrent tasks per host is
 about 33gb of space topping out. The nodes of course were failing before
 this(max mem on our ts cluster per jvm is 25gb).

 The memory usage primarily comes from two places, a byte array in
 LZFOutputStream and a byte array in BufferedOutputStream. These are both
 output buffers along the way to disk(so when we are using the former we can
 turn down/disable the latter). These are configured to be 65kb and 100kb
 respectively by default. The former is not a configurable option but is
 static in that library's code.

 These come from the ShuffleBlockWriter, that is we get an input stream with
 10k chunks. When we do operations which require partitioning (say
 distinct, reduceByKey, etc..) it maintains the existing partition count. So
 each task basically opens 10k files, each file handle of which has these
 buffers in place for that task to write to.

 Solution put in place(maybe there's a better one?):

 Given:
 X: The heap size for an executors JVM
 Y: The number of threads/cores allowed for concurrent execution per host
 Z: The expected overhead of these output streams (currently estimated at
 65k + size of the output buffer * 1.1 for overheads)
 K: The fraction of memory to allow be used for this overhead (configurable
 parameter, default @ 0.2)

 Then, the number of partitions: P = (X / Y / Z) * K

 Then inside some of our root sources now:
 - After assembling the RDD, if numPartitions  P
 - coalesce to P.
 This won't trigger another shuffle phase, so can easily sit inline to
 source definitions.

 The only real down side of this approach i've seen is that it limits the
 number of tasks in this initial map phase which may not be ideal for
 parallelism when loading a large dataset and then filtering heavily. It
 would be more efficient to pass P into the first distinct/reduceByKey call,
 but the user code would have to reference P.





 On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust mich...@databricks.com
 wrote:

  Hey Ian,
 
  Thanks for bringing these up!  Responses in-line:
 
  Just wondering if right now spark sql is expected to be thread safe on
   master?
   doing a simple hadoop file - RDD - schema RDD - write parquet
   will fail in reflection code if i run these in a thread pool.
  
 
  You are probably hitting SPARK-2178
  https://issues.apache.org/jira/browse/SPARK-2178 which is caused by
  SI-6240 https://issues.scala-lang.org/browse/SI-6240.  We have a plan
 to
  fix this by moving the schema introspection to compile time, using
 macros.
 
 
   The SparkSqlSerializer, seems to create a new Kryo instance each time
 it
   wants to serialize anything. I got a huge speedup when I had any
   non-primitive type in my SchemaRDD using the ResourcePool's from Chill
  for
   providing the KryoSerializer to it. (I can open an RB if there is some
   reason not to re-use them?)
  
 
  Sounds like SPARK-2102 https://issues.apache.org/jira/browse/SPARK-2102
 .
   There is no reason AFAIK to not reuse the instance. A PR would be
 greatly
  appreciated!
 
 
   With 

Re: sparkSQL thread safe?

2014-07-12 Thread Ian O'Connell
Thanks for the response Michael

On the first i'm following the JIRA now thanks, not blocker for me but
would be great to see.

I opened up a PR with the resource pool usage around it. I didn't include
it in the PR, but a few classes we should probably add as registered in
kryo for good perf/size:
classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
classOf[org.apache.spark.sql.catalyst.expressions.Row],
classOf[Array[Object]],
scala.collection.immutable.Nil.getClass,
scala.collection.immutable.::.getClass,
classOf[scala.collection.immutable.::[Any]]

Thanks for adding that distinct btw, great to have it scale more.

On the last, opened the JIRA thanks.

Also more of a sparkCore thing that you might already be aware of, but I
haven't seen mentioned somewhere and was hitting me(Also if any part of
this seems wrong to you I'd love to know):

I was getting out of memory doing a bunch of ops against medium(~1TB
compressed) input sizes with simple things that should spill nicely
(distinct, reduceByKey(_ + _) ).

Anyway what I came back with(copied from an internal email):

I looked through some heap dumps from the OOM's in spark and found there
were 10k instances of DiskBlockObjectWriter's each of which were up to
300kb in size per active executor. At up to 12 concurrent tasks per host is
about 33gb of space topping out. The nodes of course were failing before
this(max mem on our ts cluster per jvm is 25gb).

The memory usage primarily comes from two places, a byte array in
LZFOutputStream and a byte array in BufferedOutputStream. These are both
output buffers along the way to disk(so when we are using the former we can
turn down/disable the latter). These are configured to be 65kb and 100kb
respectively by default. The former is not a configurable option but is
static in that library's code.

These come from the ShuffleBlockWriter, that is we get an input stream with
10k chunks. When we do operations which require partitioning (say
distinct, reduceByKey, etc..) it maintains the existing partition count. So
each task basically opens 10k files, each file handle of which has these
buffers in place for that task to write to.

Solution put in place(maybe there's a better one?):

Given:
X: The heap size for an executors JVM
Y: The number of threads/cores allowed for concurrent execution per host
Z: The expected overhead of these output streams (currently estimated at
65k + size of the output buffer * 1.1 for overheads)
K: The fraction of memory to allow be used for this overhead (configurable
parameter, default @ 0.2)

Then, the number of partitions: P = (X / Y / Z) * K

Then inside some of our root sources now:
- After assembling the RDD, if numPartitions  P
- coalesce to P.
This won't trigger another shuffle phase, so can easily sit inline to
source definitions.

The only real down side of this approach i've seen is that it limits the
number of tasks in this initial map phase which may not be ideal for
parallelism when loading a large dataset and then filtering heavily. It
would be more efficient to pass P into the first distinct/reduceByKey call,
but the user code would have to reference P.





On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust mich...@databricks.com
wrote:

 Hey Ian,

 Thanks for bringing these up!  Responses in-line:

 Just wondering if right now spark sql is expected to be thread safe on
  master?
  doing a simple hadoop file - RDD - schema RDD - write parquet
  will fail in reflection code if i run these in a thread pool.
 

 You are probably hitting SPARK-2178
 https://issues.apache.org/jira/browse/SPARK-2178 which is caused by
 SI-6240 https://issues.scala-lang.org/browse/SI-6240.  We have a plan to
 fix this by moving the schema introspection to compile time, using macros.


  The SparkSqlSerializer, seems to create a new Kryo instance each time it
  wants to serialize anything. I got a huge speedup when I had any
  non-primitive type in my SchemaRDD using the ResourcePool's from Chill
 for
  providing the KryoSerializer to it. (I can open an RB if there is some
  reason not to re-use them?)
 

 Sounds like SPARK-2102 https://issues.apache.org/jira/browse/SPARK-2102.
  There is no reason AFAIK to not reuse the instance. A PR would be greatly
 appreciated!


  With the Distinct Count operator there is no map-side operations, and a
  test to check for this. Is there any reason not to do a map side combine
  into a set and then merge the sets later? (similar to the approximate
  distinct count operator)
 

 Thats just not an optimization that we had implemented yet... but I've just
 done it here https://github.com/apache/spark/pull/1366 and it'll be in
 master soon :)


  Another thing while i'm mailing.. the 1.0.1 docs have a section like:
  
  // Note: Case classes in Scala 2.10 can support only up to 22 fields. To
  work around this limit, // you can use custom classes that 

Re: sparkSQL thread safe?

2014-07-10 Thread Michael Armbrust
Hey Ian,

Thanks for bringing these up!  Responses in-line:

Just wondering if right now spark sql is expected to be thread safe on
 master?
 doing a simple hadoop file - RDD - schema RDD - write parquet
 will fail in reflection code if i run these in a thread pool.


You are probably hitting SPARK-2178
https://issues.apache.org/jira/browse/SPARK-2178 which is caused by
SI-6240 https://issues.scala-lang.org/browse/SI-6240.  We have a plan to
fix this by moving the schema introspection to compile time, using macros.


 The SparkSqlSerializer, seems to create a new Kryo instance each time it
 wants to serialize anything. I got a huge speedup when I had any
 non-primitive type in my SchemaRDD using the ResourcePool's from Chill for
 providing the KryoSerializer to it. (I can open an RB if there is some
 reason not to re-use them?)


Sounds like SPARK-2102 https://issues.apache.org/jira/browse/SPARK-2102.
 There is no reason AFAIK to not reuse the instance. A PR would be greatly
appreciated!


 With the Distinct Count operator there is no map-side operations, and a
 test to check for this. Is there any reason not to do a map side combine
 into a set and then merge the sets later? (similar to the approximate
 distinct count operator)


Thats just not an optimization that we had implemented yet... but I've just
done it here https://github.com/apache/spark/pull/1366 and it'll be in
master soon :)


 Another thing while i'm mailing.. the 1.0.1 docs have a section like:
 
 // Note: Case classes in Scala 2.10 can support only up to 22 fields. To
 work around this limit, // you can use custom classes that implement the
 Product interface.
 

 Which sounds great, we have lots of data in thrift.. so via scrooge (
 https://github.com/twitter/scrooge), we end up with ultimately instances
 of
 traits which implement product. Though the reflection code appears to look
 for the constructor of the class and base the types based on those
 parameters?


Yeah, thats true that we only look in the constructor at the moment, but I
don't think there is a really good reason for that (other than I guess we
will need to add code to make sure we skip builtin object methods).  If you
want to open a JIRA, we can try fixing this.

Michael