Re: sparkSQL thread safe?

2014-07-13 Thread Reynold Xin
Ian,

The LZFOutputStream's large byte buffer is sort of annoying. It is much
smaller if you use the Snappy one. The downside of the Snappy one is
slightly less compression (I've seen 10 - 20% larger sizes).

If we can find a compression scheme implementation that doesn't do very
large buffers, that'd be a good idea too ... let me know if you have any
suggestions.

In the future, we plan to make shuffle write to less number of streams at
the same time.



On Sat, Jul 12, 2014 at 7:59 PM, Ian O'Connell i...@ianoconnell.com wrote:

 Thanks for the response Michael

 On the first i'm following the JIRA now thanks, not blocker for me but
 would be great to see.

 I opened up a PR with the resource pool usage around it. I didn't include
 it in the PR, but a few classes we should probably add as registered in
 kryo for good perf/size:
 classOf[org.apache.spark.sql.catalyst.expressions.GenericRow],
 classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow],
 classOf[org.apache.spark.sql.catalyst.expressions.Row],
 classOf[Array[Object]],
 scala.collection.immutable.Nil.getClass,
 scala.collection.immutable.::.getClass,
 classOf[scala.collection.immutable.::[Any]]

 Thanks for adding that distinct btw, great to have it scale more.

 On the last, opened the JIRA thanks.

 Also more of a sparkCore thing that you might already be aware of, but I
 haven't seen mentioned somewhere and was hitting me(Also if any part of
 this seems wrong to you I'd love to know):

 I was getting out of memory doing a bunch of ops against medium(~1TB
 compressed) input sizes with simple things that should spill nicely
 (distinct, reduceByKey(_ + _) ).

 Anyway what I came back with(copied from an internal email):

 I looked through some heap dumps from the OOM's in spark and found there
 were 10k instances of DiskBlockObjectWriter's each of which were up to
 300kb in size per active executor. At up to 12 concurrent tasks per host is
 about 33gb of space topping out. The nodes of course were failing before
 this(max mem on our ts cluster per jvm is 25gb).

 The memory usage primarily comes from two places, a byte array in
 LZFOutputStream and a byte array in BufferedOutputStream. These are both
 output buffers along the way to disk(so when we are using the former we can
 turn down/disable the latter). These are configured to be 65kb and 100kb
 respectively by default. The former is not a configurable option but is
 static in that library's code.

 These come from the ShuffleBlockWriter, that is we get an input stream with
 10k chunks. When we do operations which require partitioning (say
 distinct, reduceByKey, etc..) it maintains the existing partition count. So
 each task basically opens 10k files, each file handle of which has these
 buffers in place for that task to write to.

 Solution put in place(maybe there's a better one?):

 Given:
 X: The heap size for an executors JVM
 Y: The number of threads/cores allowed for concurrent execution per host
 Z: The expected overhead of these output streams (currently estimated at
 65k + size of the output buffer * 1.1 for overheads)
 K: The fraction of memory to allow be used for this overhead (configurable
 parameter, default @ 0.2)

 Then, the number of partitions: P = (X / Y / Z) * K

 Then inside some of our root sources now:
 - After assembling the RDD, if numPartitions  P
 - coalesce to P.
 This won't trigger another shuffle phase, so can easily sit inline to
 source definitions.

 The only real down side of this approach i've seen is that it limits the
 number of tasks in this initial map phase which may not be ideal for
 parallelism when loading a large dataset and then filtering heavily. It
 would be more efficient to pass P into the first distinct/reduceByKey call,
 but the user code would have to reference P.





 On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust mich...@databricks.com
 wrote:

  Hey Ian,
 
  Thanks for bringing these up!  Responses in-line:
 
  Just wondering if right now spark sql is expected to be thread safe on
   master?
   doing a simple hadoop file - RDD - schema RDD - write parquet
   will fail in reflection code if i run these in a thread pool.
  
 
  You are probably hitting SPARK-2178
  https://issues.apache.org/jira/browse/SPARK-2178 which is caused by
  SI-6240 https://issues.scala-lang.org/browse/SI-6240.  We have a plan
 to
  fix this by moving the schema introspection to compile time, using
 macros.
 
 
   The SparkSqlSerializer, seems to create a new Kryo instance each time
 it
   wants to serialize anything. I got a huge speedup when I had any
   non-primitive type in my SchemaRDD using the ResourcePool's from Chill
  for
   providing the KryoSerializer to it. (I can open an RB if there is some
   reason not to re-use them?)
  
 
  Sounds like SPARK-2102 https://issues.apache.org/jira/browse/SPARK-2102
 .
   There is no reason AFAIK to not reuse the instance. A PR would be
 greatly
  appreciated!
 
 
   With 

Re: on shark, is tachyon less efficient than memory_only cache strategy ?

2014-07-13 Thread Haoyuan Li
Qingyang,

Are you asking Spark or Shark (The first email was Shark, the last email
was Spark.)?

Best,

Haoyuan


On Wed, Jul 9, 2014 at 7:40 PM, qingyang li liqingyang1...@gmail.com
wrote:

 could i set some cache policy to let spark load data from tachyon only one
 time for all sql query?  for example by using CacheAllPolicy
 FIFOCachePolicy LRUCachePolicy.  But I have tried that three policy, they
 are not useful.
 I think , if spark always load data for each sql query,  it will impact the
 query speed , it will take more time than the case that data are managed by
 spark itself.




 2014-07-09 1:19 GMT+08:00 Haoyuan Li haoyuan...@gmail.com:

  Yes. For Shark, two modes, shark.cache=tachyon and
 shark.cache=memory,
  have the same ser/de overhead. Shark loads data from outsize of the
 process
  in Tachyon mode with the following benefits:
 
 
 - In-memory data sharing across multiple Shark instances (i.e.
 stronger
 isolation)
 - Instant recovery of in-memory tables
 - Reduce heap size = faster GC in shark
 - If the table is larger than the memory size, only the hot columns
 will
 be cached in memory
 
  from http://tachyon-project.org/master/Running-Shark-on-Tachyon.html and
  https://github.com/amplab/shark/wiki/Running-Shark-with-Tachyon
 
  Haoyuan
 
 
  On Tue, Jul 8, 2014 at 9:58 AM, Aaron Davidson ilike...@gmail.com
 wrote:
 
   Shark's in-memory format is already serialized (it's compressed and
   column-based).
  
  
   On Tue, Jul 8, 2014 at 9:50 AM, Mridul Muralidharan mri...@gmail.com
   wrote:
  
You are ignoring serde costs :-)
   
- Mridul
   
On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com
   wrote:
 Tachyon should only be marginally less performant than memory_only,
because
 we mmap the data from Tachyon's ramdisk. We do not have to, say,
   transfer
 the data over a pipe from Tachyon; we can directly read from the
   buffers
in
 the same way that Shark reads from its in-memory columnar format.



 On Tue, Jul 8, 2014 at 1:18 AM, qingyang li 
  liqingyang1...@gmail.com
 wrote:

 hi, when i create a table, i can point the cache strategy using
 shark.cache,
 i think shark.cache=memory_only  means data are managed by
 spark,
   and
 data are in the same jvm with excutor;   while
   shark.cache=tachyon
  means  data are managed by tachyon which is off heap, and data
 are
   not
in
 the same jvm with excutor,  so spark will load data from tachyon
 for
each
 query sql , so,  is  tachyon less efficient than memory_only cache
strategy
  ?
 if yes, can we let spark load all data once from tachyon  for all
  sql
query
  if i want to use tachyon cache strategy since tachyon is more HA
  than
 memory_only ?

   
  
 
 
 
  --
  Haoyuan Li
  AMPLab, EECS, UC Berkeley
  http://www.cs.berkeley.edu/~haoyuan/
 




-- 
Haoyuan Li
AMPLab, EECS, UC Berkeley
http://www.cs.berkeley.edu/~haoyuan/


Re: EC2 clusters ready in launch time + 30 seconds

2014-07-13 Thread Shivaram Venkataraman
It should be possible to improve cluster launch time if we are careful
about what commands we run during setup. One way to do this would be to
walk down the list of things we do for cluster initialization and see if
there is anything we can do make things faster. Unfortunately this might be
pretty time consuming, but I don't know of a better strategy. The place to
start would be the setup.sh file at
https://github.com/mesos/spark-ec2/blob/v3/setup.sh

Here are some things that take a lot of time and could be improved:
1. Creating swap partitions on all machines. We could check if there is a
way to get EC2 to always mount a swap partition
2. Copying / syncing things across slaves. The copy-dir script is called
too many times right now and each time it pauses for a few milliseconds
between slaves [1]. This could be improved by removing unnecessary copies
3. We could make less frequently used modules like Tachyon, persistent hdfs
not a part of the default setup.

[1] https://github.com/mesos/spark-ec2/blob/v3/copy-dir.sh#L42

Thanks
Shivaram




On Sat, Jul 12, 2014 at 7:02 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 On Thu, Jul 10, 2014 at 8:10 PM, Nate D'Amico n...@reactor8.com wrote:

  Starting to work through some automation/config stuff for spark stack on
  EC2 with a project, will be focusing the work through the apache bigtop
  effort to start, can then share with spark community directly as things
  progress if people are interested


 Let us know how that goes. I'm definitely interested in hearing more.

 Nick



Re: on shark, is tachyon less efficient than memory_only cache strategy ?

2014-07-13 Thread qingyang li
Shark,  thanks for replying.
Let's me clear my question again.
--
i create a table using  create table xxx1
tblproperties(shark.cache=tachyon) as select * from xxx2
when excuting some sql (for example , select * from xxx1) using shark,
 shark will read data into shark's memory  from tachyon's memory.
I think if each time we execute sql, shark always load data from tachyon,
it is less effient.
could we use some cache policy (such as,  CacheAllPolicy FIFOCachePolicy
LRUCachePolicy ) to cache data to invoid reading data from tachyon for each
sql query?
--



2014-07-14 2:47 GMT+08:00 Haoyuan Li haoyuan...@gmail.com:

 Qingyang,

 Are you asking Spark or Shark (The first email was Shark, the last email
 was Spark.)?

 Best,

 Haoyuan


 On Wed, Jul 9, 2014 at 7:40 PM, qingyang li liqingyang1...@gmail.com
 wrote:

  could i set some cache policy to let spark load data from tachyon only
 one
  time for all sql query?  for example by using CacheAllPolicy
  FIFOCachePolicy LRUCachePolicy.  But I have tried that three policy, they
  are not useful.
  I think , if spark always load data for each sql query,  it will impact
 the
  query speed , it will take more time than the case that data are managed
 by
  spark itself.
 
 
 
 
  2014-07-09 1:19 GMT+08:00 Haoyuan Li haoyuan...@gmail.com:
 
   Yes. For Shark, two modes, shark.cache=tachyon and
  shark.cache=memory,
   have the same ser/de overhead. Shark loads data from outsize of the
  process
   in Tachyon mode with the following benefits:
  
  
  - In-memory data sharing across multiple Shark instances (i.e.
  stronger
  isolation)
  - Instant recovery of in-memory tables
  - Reduce heap size = faster GC in shark
  - If the table is larger than the memory size, only the hot columns
  will
  be cached in memory
  
   from http://tachyon-project.org/master/Running-Shark-on-Tachyon.html
 and
   https://github.com/amplab/shark/wiki/Running-Shark-with-Tachyon
  
   Haoyuan
  
  
   On Tue, Jul 8, 2014 at 9:58 AM, Aaron Davidson ilike...@gmail.com
  wrote:
  
Shark's in-memory format is already serialized (it's compressed and
column-based).
   
   
On Tue, Jul 8, 2014 at 9:50 AM, Mridul Muralidharan 
 mri...@gmail.com
wrote:
   
 You are ignoring serde costs :-)

 - Mridul

 On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com
 
wrote:
  Tachyon should only be marginally less performant than
 memory_only,
 because
  we mmap the data from Tachyon's ramdisk. We do not have to, say,
transfer
  the data over a pipe from Tachyon; we can directly read from the
buffers
 in
  the same way that Shark reads from its in-memory columnar format.
 
 
 
  On Tue, Jul 8, 2014 at 1:18 AM, qingyang li 
   liqingyang1...@gmail.com
  wrote:
 
  hi, when i create a table, i can point the cache strategy using
  shark.cache,
  i think shark.cache=memory_only  means data are managed by
  spark,
and
  data are in the same jvm with excutor;   while
shark.cache=tachyon
   means  data are managed by tachyon which is off heap, and data
  are
not
 in
  the same jvm with excutor,  so spark will load data from tachyon
  for
 each
  query sql , so,  is  tachyon less efficient than memory_only
 cache
 strategy
   ?
  if yes, can we let spark load all data once from tachyon  for
 all
   sql
 query
   if i want to use tachyon cache strategy since tachyon is more
 HA
   than
  memory_only ?
 

   
  
  
  
   --
   Haoyuan Li
   AMPLab, EECS, UC Berkeley
   http://www.cs.berkeley.edu/~haoyuan/
  
 



 --
 Haoyuan Li
 AMPLab, EECS, UC Berkeley
 http://www.cs.berkeley.edu/~haoyuan/