On Tue, Oct 28, 2014 at 5:17 PM, Chris Riccomini < [email protected]> wrote:
> Hey Jordan, > > > Couldn't you instead concurrently commit offsets for each owned > >partition by taking the minimum offset of the threads working on that > >partition, minus one? That way, in the worst case, you'd only screw up by > >forgetting to commit some just-finished work until the next call to > >window(). > > Yes, you could, but this would require changes to Samza, itself. The > window() method can be done today with no changes to Samza. > I must be missing something - since in your suggested implementation the Task itself manages the thread pool, what's stopping window() from doing what I suggested without changing Samza? Oh, I guess the problem is that Samza makes one Task instance per partition regardless of your parallelism settings? So the thread pool you suggest is actually parallelism within a single partition? > One other random aside on the threading situation is that, if you care > about message ordering, you'll need to make sure that messages that are > handed off to threads are done so based on their key or the partition they > came from. Otherwise, t2 could get m1, and t1 could get m2, and t1 might > finish processing first, which would lead to out-of-order processing > (multi-subscriber partitions within a single job). Right - that makes sense. > > However, we recently switched to having each machine have as many > >Kafka-managed consumer threads as cores, and did away with the separate > >thread pool. > > Unless I'm misunderstanding, this is exactly what Samza does, doesn't it? > Each SamzaContainer is single threaded, so running N of them on a machine, > where N is the number of cores, results in the exact same model (since > each SamzaContainer has its own consumer threads). > The only difference is that Samza has one JVM per core, each with a single (or perhaps more than one, but at least blocking on each other?) consumer thread, whereas what we've been working with is one thread per core. This is a big deal to us mostly because of the large object memory sharing I was talking about before, but also probably because JVMs have non-trivial overhead in memory and CPU. > > Since Samza was built with single-threaded containers in mind, it seems > >to me that it might be tricky to get Samza to tell YARN that it wants n > >compute units for a single container. Is there a way to accomplish this? > > > This trickiness is why we are encouraging the one core per container > model. You can get around this by using the yarn.container.cpu.cores > setting, though. Setting this to a higher number will tell YARN that more > cores are being used. > Got it. Thanks, Jordan On 10/28/14 12:16 PM, "Jordan Lewis" <[email protected]> wrote: > > >Hey Chris, > > > >Thanks for the detailed response. > > > >Your proposed solution for adding parallelism makes sense, but I don't yet > >understand the importance of the blocking step in window(). Couldn't you > >instead concurrently commit offsets for each owned partition by taking the > >minimum offset of the threads working on that partition, minus one? That > >way, in the worst case, you'd only screw up by forgetting to commit some > >just-finished work until the next call to window(). > > > >We've had some experience with this strategy before, actually. We used to > >have each machine use a single Kafka worker thread that read from all of > >the partitions that it owned, and send the messages it consumes to a > >worker > >pool (sized proportionally to the number of cores on the machine) for > >processing. As you mention it's tricky to do the offset management right > >in > >this way. However, we recently switched to having each machine have as > >many > >Kafka-managed consumer threads as cores, and did away with the separate > >thread pool. We like this approach a lot - it's simple, easy to manage, > >and > >doesn't expose us to data loss. Have you considered adding this kind of > >partition/task based parallelism to Samza? It seems to me that this isn't > >so hard to understand, and seems like it might produce less overhead. > >However, I can also see the appeal of having the simple one thread per > >container model. > > > >Let's pretend for a moment that cross-task memory sharing was implemented, > >and that we also choose the dangerous road of adding multithreading to our > >task implementations. Since Samza was built with single-threaded > >containers > >in mind, it seems to me that it might be tricky to get Samza to tell YARN > >that it wants n compute units for a single container. Is there a way to > >accomplish this? > > > >Thanks, > >Jordan Lewis > > > >On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini < > >[email protected]> wrote: > > > >> Hey Jordan, > >> > >> Your question touches on a couple of things: > >> > >> 1. Shared objects between Samza tasks within one container. > >> 2. Multi-threaded SamzaContainers. > >> > >> For (1), there is some discussion on shared state here: > >> > >> https://issues.apache.org/jira/browse/SAMZA-402 > >> > >> The outcome of this ticket was that it's something we want, but aren't > >> implementing right now. The idea is to have a state shore that's shared > >> amongst all tasks in a container. The store would be immutable, and > >>would > >> be restored on startup via a stream that had all required data. > >> > >> An alternative to this is to just have a static variable that all tasks > >> use. This will allow all tasks within one container to use the object. > >> We've done this before, and it works reasonably well for immutable > >> objects, which you have. > >> > >> For (2), we've actively tried to avoid adding threading to the > >> SamzaContainer. Having a single threaded container has worked out pretty > >> well for us, and greatly simplifies the mental model that people need to > >> have to use Samza. Our advice to people who ask about adding parallelism > >> is to tell them to add more containers. > >> > >> That said, it is possible to run threads inside a StreamTask if you > >>really > >> want to increase your parallelism. Again, I would advise against this. > >>If > >> not implemented properly, doing so can lead to data loss. The proper way > >> to implement threading inside a StreamTask is to have an thread pool > >> execute, and give threads messages as process() is called. You must then > >> disable offset checkpointing by setting task.commit.ms to -1. Lastly, > >>your > >> task must implement WindowableTask. In the window method, it must block > >>on > >> all threads that are currently processing a message. When all threads > >>have > >> finished processing, it's then safe to checkpoint offsets, and the > >>window > >> method must call coordinator.commit(). > >> > >> We've written a task that does this as well, and it works, but you have > >>to > >> know what you're doing to get it right. > >> > >> So, I think the two state options are: > >> > >> 1. Wait for global state to be implemented (or implement it yourself > >>:)). > >> This could take a while. > >> 2. Use static objects to share state among StreamTasks in a given > >> SamzaContainer. > >> > >> And for parallelism: > >> > >> 1. Increase partition/container count for your job. > >> 2. Add threads to your StreamTasks. > >> > >> Cheers, > >> Chris > >> > >> On 10/27/14 12:52 PM, "Jordan Lewis" <[email protected]> wrote: > >> > >> >Hi, > >> > > >> >My team is interested in trying out Samza to augment or replace our > >> >hand-rolled Kafka-based stream processing system. I have a question > >>about > >> >sharing memory across task instances. > >> > > >> >Currently, our main stream processing application has some large, > >> >immutable > >> >objects that need to be loaded into JVM heap memory in order to process > >> >messages on any partition of certain topics. We use thread-based > >> >parallelism in our system, so that the Kafka consumer threads on each > >> >machine listening to these topics can use the same instance of these > >>large > >> >heap objects. This is very convenient, as these objects are so large > >>that > >> >storing multiple copies of them would be quite wasteful. > >> > > >> >To use Samza, it seems as though each JVM would have to store copies of > >> >these objects separately, even if we were to use LevelDB's off-heap > >> >storage > >> >- each JVM would eventually have to inflate the off-heap memory into > >> >regular Java objects to be usable. One solution to this problem could > >>be > >> >using something like Google's Flatbuffers [0] for these large objects > >>- so > >> >that we could use accessors on large, off-heap ByteBuffers without > >>having > >> >to actually deserialize them. However, we think that doing this for > >>all of > >> >the relevant data we have would be a lot of work. > >> > > >> >Have you guys considered implementing a thread-based parallelism model > >>for > >> >Samza, whether as a replacement or alongside the current JVM-based > >> >parallelism approach? What obstacles are there to making this happen, > >> >assuming that decided not to do it? This approach would be invaluable > >>for > >> >our use case, since we rely so heavily (perhaps unfortunately so) on > >>these > >> >shared heap data structures. > >> > > >> >Thanks, > >> >Jordan Lewis > >> > > >> >[0]: http://google.github.io/flatbuffers/ > >> > >> > >
