looked at Chronicle Map a little, seems we could have a Chronicle Map backed state store, which allows sharing the state through all nodes/ all processes in one machine.
Fang, Yan [email protected] +1 (206) 849-4108 On Thu, Oct 30, 2014 at 10:05 AM, Roger Hoover <[email protected]> wrote: > One other possibility is to use shared memory with something like Chronicle > Map <http://openhft.net/products/chronicle-map/>. > > On Wed, Oct 29, 2014 at 12:53 PM, Jordan Lewis <[email protected]> wrote: > > > On Tue, Oct 28, 2014 at 5:39 PM, Chris Riccomini < > > [email protected]> wrote: > > > > > > The problem is that coordinator.commit doesn't take parameters. It just > > > tells Samza to commit the offset that *it* knows you've processed up > to. > > > The way Samza knows which offsets you've processed up to is implicit: > > when > > > StreamTask.process returns, Samza assumes that your task has processed > > the > > > message, and the offset is therefore safe to commit. > > > > > > > Oh, I see! That makes sense. I didn't realize that the coordinator only > > lets you request a commit in that way. > > > > > This is a big deal to us mostly because of the large object memory > > > >sharing I was talking about before, but also probably because JVMs > have > > > >non-trivial overhead in memory and CPU. > > > > > > Ah! I think I understand now. The problem is you want a high level of > > > parallelism, but every time you add it with a container, you pay for it > > in > > > memory by having another copy of this large object. > > > > > > Yep - exactly. > > > > > > > > > > Yea, unfortunately, right now the best you can do is to run a thread > pool > > > inside the container. > > > > > > > Okay. Are there any plans in the works to expose a thread-based > parallelism > > model? In other words, keep the same mental model of one TaskInstance per > > partition, but have the RunLoop distribute work to the TaskInstances in a > > container in a concurrent manner instead of a serial one. I would be very > > interested in such a project. > > > > > > - Jordan > > > > > > > > > > Cheers, > > > Chris > > > > > > On 10/28/14 2:29 PM, "Jordan Lewis" <[email protected]> wrote: > > > > > > >On Tue, Oct 28, 2014 at 5:17 PM, Chris Riccomini < > > > >[email protected]> wrote: > > > > > > > >> Hey Jordan, > > > >> > > > >> > Couldn't you instead concurrently commit offsets for each owned > > > >> >partition by taking the minimum offset of the threads working on > that > > > >> >partition, minus one? That way, in the worst case, you'd only screw > > up > > > >>by > > > >> >forgetting to commit some just-finished work until the next call to > > > >> >window(). > > > >> > > > >> Yes, you could, but this would require changes to Samza, itself. The > > > >> window() method can be done today with no changes to Samza. > > > >> > > > > > > > >I must be missing something - since in your suggested implementation > the > > > >Task itself manages the thread pool, what's stopping window() from > doing > > > >what I suggested without changing Samza? Oh, I guess the problem is > that > > > >Samza makes one Task instance per partition regardless of your > > parallelism > > > >settings? So the thread pool you suggest is actually parallelism > within > > a > > > >single partition? > > > > > > > > > > > > > > > >> One other random aside on the threading situation is that, if you > care > > > >> about message ordering, you'll need to make sure that messages that > > are > > > >> handed off to threads are done so based on their key or the > partition > > > >>they > > > >> came from. Otherwise, t2 could get m1, and t1 could get m2, and t1 > > might > > > >> finish processing first, which would lead to out-of-order processing > > > >> (multi-subscriber partitions within a single job). > > > > > > > > > > > >Right - that makes sense. > > > > > > > > > > > > > > > >> > However, we recently switched to having each machine have as many > > > >> >Kafka-managed consumer threads as cores, and did away with the > > separate > > > >> >thread pool. > > > >> > > > >> Unless I'm misunderstanding, this is exactly what Samza does, > doesn't > > > >>it? > > > >> Each SamzaContainer is single threaded, so running N of them on a > > > >>machine, > > > >> where N is the number of cores, results in the exact same model > (since > > > >> each SamzaContainer has its own consumer threads). > > > >> > > > > > > > >The only difference is that Samza has one JVM per core, each with a > > single > > > >(or perhaps more than one, but at least blocking on each other?) > > consumer > > > >thread, whereas what we've been working with is one thread per core. > > This > > > >is a big deal to us mostly because of the large object memory sharing > I > > > >was > > > >talking about before, but also probably because JVMs have non-trivial > > > >overhead in memory and CPU. > > > > > > > > > > > > > > > >> > Since Samza was built with single-threaded containers in mind, it > > > >>seems > > > >> >to me that it might be tricky to get Samza to tell YARN that it > > wants n > > > >> >compute units for a single container. Is there a way to accomplish > > > >>this? > > > >> > > > >> > > > >> This trickiness is why we are encouraging the one core per container > > > >> model. You can get around this by using the yarn.container.cpu.cores > > > >> setting, though. Setting this to a higher number will tell YARN that > > > >>more > > > >> cores are being used. > > > >> > > > > > > > >Got it. > > > > > > > >Thanks, > > > >Jordan > > > > > > > > > > > >On 10/28/14 12:16 PM, "Jordan Lewis" <[email protected]> wrote: > > > >> > > > >> >Hey Chris, > > > >> > > > > >> >Thanks for the detailed response. > > > >> > > > > >> >Your proposed solution for adding parallelism makes sense, but I > > don't > > > >>yet > > > >> >understand the importance of the blocking step in window(). > Couldn't > > > >>you > > > >> >instead concurrently commit offsets for each owned partition by > > taking > > > >>the > > > >> >minimum offset of the threads working on that partition, minus one? > > > >>That > > > >> >way, in the worst case, you'd only screw up by forgetting to commit > > > >>some > > > >> >just-finished work until the next call to window(). > > > >> > > > > >> >We've had some experience with this strategy before, actually. We > > used > > > >>to > > > >> >have each machine use a single Kafka worker thread that read from > all > > > >>of > > > >> >the partitions that it owned, and send the messages it consumes to > a > > > >> >worker > > > >> >pool (sized proportionally to the number of cores on the machine) > for > > > >> >processing. As you mention it's tricky to do the offset management > > > >>right > > > >> >in > > > >> >this way. However, we recently switched to having each machine have > > as > > > >> >many > > > >> >Kafka-managed consumer threads as cores, and did away with the > > separate > > > >> >thread pool. We like this approach a lot - it's simple, easy to > > manage, > > > >> >and > > > >> >doesn't expose us to data loss. Have you considered adding this > kind > > of > > > >> >partition/task based parallelism to Samza? It seems to me that this > > > >>isn't > > > >> >so hard to understand, and seems like it might produce less > overhead. > > > >> >However, I can also see the appeal of having the simple one thread > > per > > > >> >container model. > > > >> > > > > >> >Let's pretend for a moment that cross-task memory sharing was > > > >>implemented, > > > >> >and that we also choose the dangerous road of adding multithreading > > to > > > >>our > > > >> >task implementations. Since Samza was built with single-threaded > > > >> >containers > > > >> >in mind, it seems to me that it might be tricky to get Samza to > tell > > > >>YARN > > > >> >that it wants n compute units for a single container. Is there a > way > > to > > > >> >accomplish this? > > > >> > > > > >> >Thanks, > > > >> >Jordan Lewis > > > >> > > > > >> >On Mon, Oct 27, 2014 at 5:51 PM, Chris Riccomini < > > > >> >[email protected]> wrote: > > > >> > > > > >> >> Hey Jordan, > > > >> >> > > > >> >> Your question touches on a couple of things: > > > >> >> > > > >> >> 1. Shared objects between Samza tasks within one container. > > > >> >> 2. Multi-threaded SamzaContainers. > > > >> >> > > > >> >> For (1), there is some discussion on shared state here: > > > >> >> > > > >> >> https://issues.apache.org/jira/browse/SAMZA-402 > > > >> >> > > > >> >> The outcome of this ticket was that it's something we want, but > > > >>aren't > > > >> >> implementing right now. The idea is to have a state shore that's > > > >>shared > > > >> >> amongst all tasks in a container. The store would be immutable, > and > > > >> >>would > > > >> >> be restored on startup via a stream that had all required data. > > > >> >> > > > >> >> An alternative to this is to just have a static variable that all > > > >>tasks > > > >> >> use. This will allow all tasks within one container to use the > > > >>object. > > > >> >> We've done this before, and it works reasonably well for > immutable > > > >> >> objects, which you have. > > > >> >> > > > >> >> For (2), we've actively tried to avoid adding threading to the > > > >> >> SamzaContainer. Having a single threaded container has worked out > > > >>pretty > > > >> >> well for us, and greatly simplifies the mental model that people > > > >>need to > > > >> >> have to use Samza. Our advice to people who ask about adding > > > >>parallelism > > > >> >> is to tell them to add more containers. > > > >> >> > > > >> >> That said, it is possible to run threads inside a StreamTask if > you > > > >> >>really > > > >> >> want to increase your parallelism. Again, I would advise against > > > >>this. > > > >> >>If > > > >> >> not implemented properly, doing so can lead to data loss. The > > proper > > > >>way > > > >> >> to implement threading inside a StreamTask is to have an thread > > pool > > > >> >> execute, and give threads messages as process() is called. You > must > > > >>then > > > >> >> disable offset checkpointing by setting task.commit.ms to -1. > > > Lastly, > > > >> >>your > > > >> >> task must implement WindowableTask. In the window method, it must > > > >>block > > > >> >>on > > > >> >> all threads that are currently processing a message. When all > > threads > > > >> >>have > > > >> >> finished processing, it's then safe to checkpoint offsets, and > the > > > >> >>window > > > >> >> method must call coordinator.commit(). > > > >> >> > > > >> >> We've written a task that does this as well, and it works, but > you > > > >>have > > > >> >>to > > > >> >> know what you're doing to get it right. > > > >> >> > > > >> >> So, I think the two state options are: > > > >> >> > > > >> >> 1. Wait for global state to be implemented (or implement it > > yourself > > > >> >>:)). > > > >> >> This could take a while. > > > >> >> 2. Use static objects to share state among StreamTasks in a given > > > >> >> SamzaContainer. > > > >> >> > > > >> >> And for parallelism: > > > >> >> > > > >> >> 1. Increase partition/container count for your job. > > > >> >> 2. Add threads to your StreamTasks. > > > >> >> > > > >> >> Cheers, > > > >> >> Chris > > > >> >> > > > >> >> On 10/27/14 12:52 PM, "Jordan Lewis" <[email protected]> wrote: > > > >> >> > > > >> >> >Hi, > > > >> >> > > > > >> >> >My team is interested in trying out Samza to augment or replace > > our > > > >> >> >hand-rolled Kafka-based stream processing system. I have a > > question > > > >> >>about > > > >> >> >sharing memory across task instances. > > > >> >> > > > > >> >> >Currently, our main stream processing application has some > large, > > > >> >> >immutable > > > >> >> >objects that need to be loaded into JVM heap memory in order to > > > >>process > > > >> >> >messages on any partition of certain topics. We use thread-based > > > >> >> >parallelism in our system, so that the Kafka consumer threads on > > > >>each > > > >> >> >machine listening to these topics can use the same instance of > > these > > > >> >>large > > > >> >> >heap objects. This is very convenient, as these objects are so > > large > > > >> >>that > > > >> >> >storing multiple copies of them would be quite wasteful. > > > >> >> > > > > >> >> >To use Samza, it seems as though each JVM would have to store > > > >>copies of > > > >> >> >these objects separately, even if we were to use LevelDB's > > off-heap > > > >> >> >storage > > > >> >> >- each JVM would eventually have to inflate the off-heap memory > > into > > > >> >> >regular Java objects to be usable. One solution to this problem > > > >>could > > > >> >>be > > > >> >> >using something like Google's Flatbuffers [0] for these large > > > >>objects > > > >> >>- so > > > >> >> >that we could use accessors on large, off-heap ByteBuffers > without > > > >> >>having > > > >> >> >to actually deserialize them. However, we think that doing this > > for > > > >> >>all of > > > >> >> >the relevant data we have would be a lot of work. > > > >> >> > > > > >> >> >Have you guys considered implementing a thread-based parallelism > > > >>model > > > >> >>for > > > >> >> >Samza, whether as a replacement or alongside the current > JVM-based > > > >> >> >parallelism approach? What obstacles are there to making this > > > >>happen, > > > >> >> >assuming that decided not to do it? This approach would be > > > >>invaluable > > > >> >>for > > > >> >> >our use case, since we rely so heavily (perhaps unfortunately > so) > > on > > > >> >>these > > > >> >> >shared heap data structures. > > > >> >> > > > > >> >> >Thanks, > > > >> >> >Jordan Lewis > > > >> >> > > > > >> >> >[0]: http://google.github.io/flatbuffers/ > > > >> >> > > > >> >> > > > >> > > > >> > > > > > > > > >
