Re: off-heap RDDs
Haoyuan, Thanks, that sounds great, exactly what we are looking for. We might be interested in integrating Tachyon with CFS (Cassandra File System, the Cassandra-based implementation of HDFS). -Evan On Sat, Aug 31, 2013 at 3:33 PM, Haoyuan Li haoyuan...@gmail.com wrote: Evan, If I understand you correctly, you want to avoid network I/O as much as possible by caching the data on the node having the data on disk. Actually, what I meant client caching would automatically do this. For example, suppose you have a cluster of machines, nothing cached in memory yet. Then a spark application runs on it. Spark asks Tachyon where data X is. Since nothing is in memory yet, Tachyon would return disk locations for the first time. Then Spark program will try to take advantage of disk data locality, and load the data X in HDFS node N into the off-heap memory of node N. In the future, when Spark asks Tachyon the location of X, Tachyon will return node N. There is no network I/O involved in the whole process. Let me know if I misunderstood something. Haoyuan On Fri, Aug 30, 2013 at 10:00 AM, Evan Chan e...@ooyala.com wrote: Hey guys, I would also prefer to strengthen and get behind Tachyon, rather than implement a separate solution (though I guess if it's not offiically supported, then nobody will ask questions). But it's more that off-heap memory is difficult, so it's better to focus efforts on one project, is my feeling. Haoyuan, Tachyon brings cached HDFS data to the local client. Have we thought about the opposite approach, which might be more efficient? - Load the data in HDFS node N into the off-heap memory of node N - in Spark, inform the framework (maybe via RDD partition/location info) of where the data is, that it is located in node N - bring the computation to node N This avoids network IO and may be much more efficient for many types of applications. I know this would be a big win for us. -Evan On Wed, Aug 28, 2013 at 1:37 AM, Haoyuan Li haoyuan...@gmail.com wrote: No problem. Like reading/writing data from/to off-heap bytebuffer, when a program reads/writes data from/to Tachyon, Spark/Shark needs to do ser/de. Efficient ser/de will help on performance a lot as people pointed out. One solution is that the application can do primitive operations directly on ByteBuffer, like how Shark is handling it now. Most related code is located at https://github.com/amplab/shark/tree/master/src/main/scala/shark/memstore2 and https://github.com/amplab/shark/tree/master/src/tachyon_enabled/scala/shark/tachyon . Haoyuan On Wed, Aug 28, 2013 at 1:21 AM, Imran Rashid im...@therashids.com wrote: Thanks Haoyuan. It seems like we should try out Tachyon, sounds like it is what we are looking for. On Wed, Aug 28, 2013 at 8:18 AM, Haoyuan Li haoyuan...@gmail.com wrote: Response inline. On Tue, Aug 27, 2013 at 1:37 AM, Imran Rashid im...@therashids.com wrote: Thanks for all the great comments discussion. Let me expand a bit on our use case, and then I'm gonna combine responses to various questions. In general, when we use spark, we have some really big RDDs that use up a lot of memory (10s of GB per node) that are really our core data sets. We tend to start up a spark application, immediately load all those data sets, and just leave them loaded for the lifetime of that process. We definitely create a lot of other RDDs along the way, and lots of intermediate objects that we'd like to go through normal garbage collection. But those all require much less memory, maybe 1/10th of the big RDDs that we just keep around. I know this is a bit of a special case, but it seems like it probably isn't that different from a lot of use cases. Reynold Xin wrote: This is especially attractive if the application can read directly from a byte buffer without generic serialization (like Shark). interesting -- can you explain how this works in Shark? do you have some general way of storing data in byte buffers that avoids serialization? Or do you mean that if the user is effectively creating an RDD of ints, that you create a an RDD[ByteBuffer], and then you read / write ints into the byte buffer yourself? Sorry, I'm familiar with the basic idea of shark but not the code at all -- even a pointer to the code would be helpful. Haoyun Li wrote: One possible solution is that you can use Tachyonhttps://github.com/amplab/tachyon. This is a good idea, that I had probably overlooked. There are two potential issues that I can think of with this approach, though: 1) I was under the impression that Tachyon is still not really tested in production
Re: off-heap RDDs
No problem. Like reading/writing data from/to off-heap bytebuffer, when a program reads/writes data from/to Tachyon, Spark/Shark needs to do ser/de. Efficient ser/de will help on performance a lot as people pointed out. One solution is that the application can do primitive operations directly on ByteBuffer, like how Shark is handling it now. Most related code is located at https://github.com/amplab/shark/tree/master/src/main/scala/shark/memstore2; and https://github.com/amplab/shark/tree/master/src/tachyon_enabled/scala/shark/tachyon . Haoyuan On Wed, Aug 28, 2013 at 1:21 AM, Imran Rashid im...@therashids.com wrote: Thanks Haoyuan. It seems like we should try out Tachyon, sounds like it is what we are looking for. On Wed, Aug 28, 2013 at 8:18 AM, Haoyuan Li haoyuan...@gmail.com wrote: Response inline. On Tue, Aug 27, 2013 at 1:37 AM, Imran Rashid im...@therashids.com wrote: Thanks for all the great comments discussion. Let me expand a bit on our use case, and then I'm gonna combine responses to various questions. In general, when we use spark, we have some really big RDDs that use up a lot of memory (10s of GB per node) that are really our core data sets. We tend to start up a spark application, immediately load all those data sets, and just leave them loaded for the lifetime of that process. We definitely create a lot of other RDDs along the way, and lots of intermediate objects that we'd like to go through normal garbage collection. But those all require much less memory, maybe 1/10th of the big RDDs that we just keep around. I know this is a bit of a special case, but it seems like it probably isn't that different from a lot of use cases. Reynold Xin wrote: This is especially attractive if the application can read directly from a byte buffer without generic serialization (like Shark). interesting -- can you explain how this works in Shark? do you have some general way of storing data in byte buffers that avoids serialization? Or do you mean that if the user is effectively creating an RDD of ints, that you create a an RDD[ByteBuffer], and then you read / write ints into the byte buffer yourself? Sorry, I'm familiar with the basic idea of shark but not the code at all -- even a pointer to the code would be helpful. Haoyun Li wrote: One possible solution is that you can use Tachyonhttps://github.com/amplab/tachyon. This is a good idea, that I had probably overlooked. There are two potential issues that I can think of with this approach, though: 1) I was under the impression that Tachyon is still not really tested in production systems, and I need something a bit more mature. Of course, my changes wouldn't be thoroughly tested either, but somehow I feel better about deploying my 5-line patch to a codebase I understand than adding another entire system. (This isn't a good reason to add this to spark in general, though, just might be a temporary patch we locally deploy) This is a legitimate concern. The good news is that, several companies have been testing it for a while, and some are close to make it to production. For example, as Yahoo mentioned in today's meetup, we are working to integrate Shark and Tachyon closely, and results are very promising. It will be in production soon. 2) I may have misunderstood Tachyon, but it seems there is a big difference in the data locality in these two approaches. On a large cluster, HDFS will spread the data all over the cluster, and so any particular piece of on-disk data will only live on a few machines. When you start a spark application, which only uses a small subset of the nodes, odds are the data you want is *not* on those nodes. So even if tachyon caches data from HDFS into memory, it won't be on the same nodes as the spark application. Which means that when the spark application reads data from the RDD, even though the data is in memory on some node in the cluster, it will need to be read over the network by the actual spark worker assigned to the application. Is my understanding correct? I haven't done any measurements at all of a difference in performance, but it seems this would be much slower. This is a great question. Actually, from data locality perspective, two approaches have no difference. Tachyon does client side caching, which means, if a client on a node reads data not on its local machine, the first read will cache the data on that node. Therefore, all future access on that node will read the data from its local memory. For example, suppose you have a cluster with 100 nodes all running HDFS and Tachyon. Then you launch a Spark jobs running on 20 nodes only. When it reads or caches the data first time, all data will be cached on those 20 nodes. In the future, when Spark master tries to schedule tasks, it will query Tachyon about data locations, and
Re: off-heap RDDs
No, you don't necessarily need a separate storage level, but I don't think you can avoid the when do I use on-heap RDDs vs. off-heap RDDs vs. RDDs in Tachyon vs. ...? questions. If off-heap RDDs don't gain us a lot over Tachyon in a lot of use cases, then I'm not sure that they are worth the extra complexity. If you can show me how to do them really simply and so that their appropriate use cases are obvious, then that changes the calculus. On Sun, Aug 25, 2013 at 6:15 PM, Reynold Xin reyno...@gmail.com wrote: Mark - you don't necessarily need to construct a separate storage level. One simple way to accomplish this is for the user application to pass Spark a DirectByteBuffer. On Sun, Aug 25, 2013 at 6:06 PM, Mark Hamstra m...@clearstorydata.com wrote: I'd need to see a clear and significant advantage to using off-heap RDDs directly within Spark vs. leveraging Tachyon. What worries me is the combinatoric explosion of different caching and persistence mechanisms. With too many of these, not only will users potentially be baffled (@user-list: What are the performance trade-offs in using MEMORY_ONLY_SER_2 vs. MEMORY_ONLY vs. off-heap RDDs? Or should I store some of my RDDs in Tachyon? Which ones?, etc. ad infinitum), but we've got to make sure that all of the combinations work correctly. At some point we end up needing to do some sort of caching/persistence manager to automate some of the choices and wrangle the permutations. That's not to say that off-heap RDDs are a bad idea or are necessarily the combinatoric last straw, but I'm concerned about adding significant complexity for only marginal gains in limited cases over a more general solution via Tachyon. I'm willing to be shown that those concerns are misplaced. On Sun, Aug 25, 2013 at 5:06 PM, Haoyuan Li haoyuan...@gmail.com wrote: Hi Imran, One possible solution is that you can use Tachyonhttps://github.com/amplab/tachyon. When data is in Tachyon, Spark jobs will read it from off-heap memory. Internally, it uses direct byte buffers to store memory-serialized RDDs as you mentioned. Also, different Spark jobs can share the same data in Tachyon's memory. Here is a presentation (slide https://docs.google.com/viewer?url=http%3A%2F%2Ffiles.meetup.com%2F3138542%2FTachyon_2013-05-09_Spark_Meetup.pdf ) we did in May. Haoyuan On Sun, Aug 25, 2013 at 3:26 PM, Imran Rashid im...@therashids.com wrote: Hi, I was wondering if anyone has thought about putting cached data in an RDD into off-heap memory, eg. w/ direct byte buffers. For really long-lived RDDs that use a lot of memory, this seems like a huge improvement, since all the memory is now totally ignored during GC. (and reading data from direct byte buffers is potentially faster as well, buts thats just a nice bonus). The easiest thing to do is to store memory-serialized RDDs in direct byte buffers, but I guess we could also store the serialized RDD on disk and use a memory mapped file. Serializing into off-heap buffers is a really simple patch, I just changed a few lines (I haven't done any real tests w/ it yet, though). But I dont' really have a ton of experience w/ off-heap memory, so I thought I would ask what others think of the idea, if it makes sense or if there are any gotchas I should be aware of, etc. thanks, Imran