Re: sparkSQL thread safe?
Ian, The LZFOutputStream's large byte buffer is sort of annoying. It is much smaller if you use the Snappy one. The downside of the Snappy one is slightly less compression (I've seen 10 - 20% larger sizes). If we can find a compression scheme implementation that doesn't do very large buffers, that'd be a good idea too ... let me know if you have any suggestions. In the future, we plan to make shuffle write to less number of streams at the same time. On Sat, Jul 12, 2014 at 7:59 PM, Ian O'Connell i...@ianoconnell.com wrote: Thanks for the response Michael On the first i'm following the JIRA now thanks, not blocker for me but would be great to see. I opened up a PR with the resource pool usage around it. I didn't include it in the PR, but a few classes we should probably add as registered in kryo for good perf/size: classOf[org.apache.spark.sql.catalyst.expressions.GenericRow], classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow], classOf[org.apache.spark.sql.catalyst.expressions.Row], classOf[Array[Object]], scala.collection.immutable.Nil.getClass, scala.collection.immutable.::.getClass, classOf[scala.collection.immutable.::[Any]] Thanks for adding that distinct btw, great to have it scale more. On the last, opened the JIRA thanks. Also more of a sparkCore thing that you might already be aware of, but I haven't seen mentioned somewhere and was hitting me(Also if any part of this seems wrong to you I'd love to know): I was getting out of memory doing a bunch of ops against medium(~1TB compressed) input sizes with simple things that should spill nicely (distinct, reduceByKey(_ + _) ). Anyway what I came back with(copied from an internal email): I looked through some heap dumps from the OOM's in spark and found there were 10k instances of DiskBlockObjectWriter's each of which were up to 300kb in size per active executor. At up to 12 concurrent tasks per host is about 33gb of space topping out. The nodes of course were failing before this(max mem on our ts cluster per jvm is 25gb). The memory usage primarily comes from two places, a byte array in LZFOutputStream and a byte array in BufferedOutputStream. These are both output buffers along the way to disk(so when we are using the former we can turn down/disable the latter). These are configured to be 65kb and 100kb respectively by default. The former is not a configurable option but is static in that library's code. These come from the ShuffleBlockWriter, that is we get an input stream with 10k chunks. When we do operations which require partitioning (say distinct, reduceByKey, etc..) it maintains the existing partition count. So each task basically opens 10k files, each file handle of which has these buffers in place for that task to write to. Solution put in place(maybe there's a better one?): Given: X: The heap size for an executors JVM Y: The number of threads/cores allowed for concurrent execution per host Z: The expected overhead of these output streams (currently estimated at 65k + size of the output buffer * 1.1 for overheads) K: The fraction of memory to allow be used for this overhead (configurable parameter, default @ 0.2) Then, the number of partitions: P = (X / Y / Z) * K Then inside some of our root sources now: - After assembling the RDD, if numPartitions P - coalesce to P. This won't trigger another shuffle phase, so can easily sit inline to source definitions. The only real down side of this approach i've seen is that it limits the number of tasks in this initial map phase which may not be ideal for parallelism when loading a large dataset and then filtering heavily. It would be more efficient to pass P into the first distinct/reduceByKey call, but the user code would have to reference P. On Thu, Jul 10, 2014 at 4:50 PM, Michael Armbrust mich...@databricks.com wrote: Hey Ian, Thanks for bringing these up! Responses in-line: Just wondering if right now spark sql is expected to be thread safe on master? doing a simple hadoop file - RDD - schema RDD - write parquet will fail in reflection code if i run these in a thread pool. You are probably hitting SPARK-2178 https://issues.apache.org/jira/browse/SPARK-2178 which is caused by SI-6240 https://issues.scala-lang.org/browse/SI-6240. We have a plan to fix this by moving the schema introspection to compile time, using macros. The SparkSqlSerializer, seems to create a new Kryo instance each time it wants to serialize anything. I got a huge speedup when I had any non-primitive type in my SchemaRDD using the ResourcePool's from Chill for providing the KryoSerializer to it. (I can open an RB if there is some reason not to re-use them?) Sounds like SPARK-2102 https://issues.apache.org/jira/browse/SPARK-2102 . There is no reason AFAIK to not reuse the instance. A PR would be greatly appreciated! With
Re: on shark, is tachyon less efficient than memory_only cache strategy ?
Qingyang, Are you asking Spark or Shark (The first email was Shark, the last email was Spark.)? Best, Haoyuan On Wed, Jul 9, 2014 at 7:40 PM, qingyang li liqingyang1...@gmail.com wrote: could i set some cache policy to let spark load data from tachyon only one time for all sql query? for example by using CacheAllPolicy FIFOCachePolicy LRUCachePolicy. But I have tried that three policy, they are not useful. I think , if spark always load data for each sql query, it will impact the query speed , it will take more time than the case that data are managed by spark itself. 2014-07-09 1:19 GMT+08:00 Haoyuan Li haoyuan...@gmail.com: Yes. For Shark, two modes, shark.cache=tachyon and shark.cache=memory, have the same ser/de overhead. Shark loads data from outsize of the process in Tachyon mode with the following benefits: - In-memory data sharing across multiple Shark instances (i.e. stronger isolation) - Instant recovery of in-memory tables - Reduce heap size = faster GC in shark - If the table is larger than the memory size, only the hot columns will be cached in memory from http://tachyon-project.org/master/Running-Shark-on-Tachyon.html and https://github.com/amplab/shark/wiki/Running-Shark-with-Tachyon Haoyuan On Tue, Jul 8, 2014 at 9:58 AM, Aaron Davidson ilike...@gmail.com wrote: Shark's in-memory format is already serialized (it's compressed and column-based). On Tue, Jul 8, 2014 at 9:50 AM, Mridul Muralidharan mri...@gmail.com wrote: You are ignoring serde costs :-) - Mridul On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com wrote: Tachyon should only be marginally less performant than memory_only, because we mmap the data from Tachyon's ramdisk. We do not have to, say, transfer the data over a pipe from Tachyon; we can directly read from the buffers in the same way that Shark reads from its in-memory columnar format. On Tue, Jul 8, 2014 at 1:18 AM, qingyang li liqingyang1...@gmail.com wrote: hi, when i create a table, i can point the cache strategy using shark.cache, i think shark.cache=memory_only means data are managed by spark, and data are in the same jvm with excutor; while shark.cache=tachyon means data are managed by tachyon which is off heap, and data are not in the same jvm with excutor, so spark will load data from tachyon for each query sql , so, is tachyon less efficient than memory_only cache strategy ? if yes, can we let spark load all data once from tachyon for all sql query if i want to use tachyon cache strategy since tachyon is more HA than memory_only ? -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/
Re: EC2 clusters ready in launch time + 30 seconds
It should be possible to improve cluster launch time if we are careful about what commands we run during setup. One way to do this would be to walk down the list of things we do for cluster initialization and see if there is anything we can do make things faster. Unfortunately this might be pretty time consuming, but I don't know of a better strategy. The place to start would be the setup.sh file at https://github.com/mesos/spark-ec2/blob/v3/setup.sh Here are some things that take a lot of time and could be improved: 1. Creating swap partitions on all machines. We could check if there is a way to get EC2 to always mount a swap partition 2. Copying / syncing things across slaves. The copy-dir script is called too many times right now and each time it pauses for a few milliseconds between slaves [1]. This could be improved by removing unnecessary copies 3. We could make less frequently used modules like Tachyon, persistent hdfs not a part of the default setup. [1] https://github.com/mesos/spark-ec2/blob/v3/copy-dir.sh#L42 Thanks Shivaram On Sat, Jul 12, 2014 at 7:02 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: On Thu, Jul 10, 2014 at 8:10 PM, Nate D'Amico n...@reactor8.com wrote: Starting to work through some automation/config stuff for spark stack on EC2 with a project, will be focusing the work through the apache bigtop effort to start, can then share with spark community directly as things progress if people are interested Let us know how that goes. I'm definitely interested in hearing more. Nick
Re: on shark, is tachyon less efficient than memory_only cache strategy ?
Shark, thanks for replying. Let's me clear my question again. -- i create a table using create table xxx1 tblproperties(shark.cache=tachyon) as select * from xxx2 when excuting some sql (for example , select * from xxx1) using shark, shark will read data into shark's memory from tachyon's memory. I think if each time we execute sql, shark always load data from tachyon, it is less effient. could we use some cache policy (such as, CacheAllPolicy FIFOCachePolicy LRUCachePolicy ) to cache data to invoid reading data from tachyon for each sql query? -- 2014-07-14 2:47 GMT+08:00 Haoyuan Li haoyuan...@gmail.com: Qingyang, Are you asking Spark or Shark (The first email was Shark, the last email was Spark.)? Best, Haoyuan On Wed, Jul 9, 2014 at 7:40 PM, qingyang li liqingyang1...@gmail.com wrote: could i set some cache policy to let spark load data from tachyon only one time for all sql query? for example by using CacheAllPolicy FIFOCachePolicy LRUCachePolicy. But I have tried that three policy, they are not useful. I think , if spark always load data for each sql query, it will impact the query speed , it will take more time than the case that data are managed by spark itself. 2014-07-09 1:19 GMT+08:00 Haoyuan Li haoyuan...@gmail.com: Yes. For Shark, two modes, shark.cache=tachyon and shark.cache=memory, have the same ser/de overhead. Shark loads data from outsize of the process in Tachyon mode with the following benefits: - In-memory data sharing across multiple Shark instances (i.e. stronger isolation) - Instant recovery of in-memory tables - Reduce heap size = faster GC in shark - If the table is larger than the memory size, only the hot columns will be cached in memory from http://tachyon-project.org/master/Running-Shark-on-Tachyon.html and https://github.com/amplab/shark/wiki/Running-Shark-with-Tachyon Haoyuan On Tue, Jul 8, 2014 at 9:58 AM, Aaron Davidson ilike...@gmail.com wrote: Shark's in-memory format is already serialized (it's compressed and column-based). On Tue, Jul 8, 2014 at 9:50 AM, Mridul Muralidharan mri...@gmail.com wrote: You are ignoring serde costs :-) - Mridul On Tue, Jul 8, 2014 at 8:48 PM, Aaron Davidson ilike...@gmail.com wrote: Tachyon should only be marginally less performant than memory_only, because we mmap the data from Tachyon's ramdisk. We do not have to, say, transfer the data over a pipe from Tachyon; we can directly read from the buffers in the same way that Shark reads from its in-memory columnar format. On Tue, Jul 8, 2014 at 1:18 AM, qingyang li liqingyang1...@gmail.com wrote: hi, when i create a table, i can point the cache strategy using shark.cache, i think shark.cache=memory_only means data are managed by spark, and data are in the same jvm with excutor; while shark.cache=tachyon means data are managed by tachyon which is off heap, and data are not in the same jvm with excutor, so spark will load data from tachyon for each query sql , so, is tachyon less efficient than memory_only cache strategy ? if yes, can we let spark load all data once from tachyon for all sql query if i want to use tachyon cache strategy since tachyon is more HA than memory_only ? -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/ -- Haoyuan Li AMPLab, EECS, UC Berkeley http://www.cs.berkeley.edu/~haoyuan/