Very interesting thoughts. >From outside, I have always perceived Samza as a computing layer over Kafka.
The question, maybe a bit provocative, is "should Samza be a sub-project of Kafka then?" Or does it make sense to keep it as a separate project with a separate governance? Cheers, -- Gianmarco On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote: > Overall, I agree to couple with Kafka more tightly. Because Samza de facto > is based on Kafka, and it should leverage what Kafka has. At the same time, > Kafka does not need to reinvent what Samza already has. I also like the > idea of separating the ingestion and transformation. > > But it is a little difficult for me to image how the Samza will look like. > And I feel Chris and Jay have a little difference in terms of how Samza > should look like. > > *** Will it look like what Jay's code shows (A client of Kakfa) ? And > user's application code calls this client? > > 1. If we make Samza be a library of Kafka (like what the code shows), how > do we implement auto-balance and fault-tolerance? Are they taken care by > the Kafka broker or other mechanism, such as "Samza worker" (just make up > the name) ? > > 2. What about other features, such as auto-scaling, shared state, > monitoring? > > > *** If we have Samza standalone, (is this what Chris suggests?) > > 1. we still need to ingest data from Kakfa and produce to it. Then it > becomes the same as what Samza looks like now, except it does not rely on > Yarn anymore. > > 2. if it is standalone, how can it leverage Kafka's metrics, logs, etc? Use > Kafka code as the dependency? > > > Thanks, > > Fang, Yan > yanfang...@gmail.com > > On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Read through the code example and it looks good to me. A few thoughts > > regarding deployment: > > > > Today Samza deploys as executable runnable like: > > > > deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://... > > > > And this proposal advocate for deploying Samza more as embedded libraries > > in user application code (ignoring the terminology since it is not the > same > > as the prototype code): > > > > StreamTask task = new MyStreamTask(configs); > > Thread thread = new Thread(task); > > thread.start(); > > > > I think both of these deployment modes are important for different types > of > > users. That said, I think making Samza purely standalone is still > > sufficient for either runnable or library modes. > > > > Guozhang > > > > On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > Looks like gmail mangled the code example, it was supposed to look like > > > this: > > > > > > Properties props = new Properties(); > > > props.put("bootstrap.servers", "localhost:4242"); > > > StreamingConfig config = new StreamingConfig(props); > > > config.subscribe("test-topic-1", "test-topic-2"); > > > config.processor(ExampleStreamProcessor.class); > > > config.serialization(new StringSerializer(), new StringDeserializer()); > > > KafkaStreaming container = new KafkaStreaming(config); > > > container.run(); > > > > > > -Jay > > > > > > On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > Hey guys, > > > > > > > > This came out of some conversations Chris and I were having around > > > whether > > > > it would make sense to use Samza as a kind of data ingestion > framework > > > for > > > > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of > > combined > > > > with complaints around config and YARN and the discussion around how > to > > > > best do a standalone mode. > > > > > > > > So the thought experiment was, given that Samza was basically already > > > > totally Kafka specific, what if you just embraced that and turned it > > into > > > > something less like a heavyweight framework and more like a third > Kafka > > > > client--a kind of "producing consumer" with state management > > facilities. > > > > Basically a library. Instead of a complex stream processing framework > > > this > > > > would actually be a very simple thing, not much more complicated to > use > > > or > > > > operate than a Kafka consumer. As Chris said we thought about it a > lot > > of > > > > what Samza (and the other stream processing systems were doing) > seemed > > > like > > > > kind of a hangover from MapReduce. > > > > > > > > Of course you need to ingest/output data to and from the stream > > > > processing. But when we actually looked into how that would work, > Samza > > > > isn't really an ideal data ingestion framework for a bunch of > reasons. > > To > > > > really do that right you need a pretty different internal data model > > and > > > > set of apis. So what if you split them and had an api for Kafka > > > > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka > > > > transformation (Samza). > > > > > > > > This would also allow really embracing the same terminology and > > > > conventions. One complaint about the current state is that the two > > > systems > > > > kind of feel bolted on. Terminology like "stream" vs "topic" and > > > different > > > > config and monitoring systems means you kind of have to learn Kafka's > > > way, > > > > then learn Samza's slightly different way, then kind of understand > how > > > they > > > > map to each other, which having walked a few people through this is > > > > surprisingly tricky for folks to get. > > > > > > > > Since I have been spending a lot of time on airplanes I hacked up an > > > > ernest but still somewhat incomplete prototype of what this would > look > > > > like. This is just unceremoniously dumped into Kafka as it required a > > few > > > > changes to the new consumer. Here is the code: > > > > > > > > > > > > > > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming > > > > > > > > For the purpose of the prototype I just liberally renamed everything > to > > > > try to align it with Kafka with no regard for compatibility. > > > > > > > > To use this would be something like this: > > > > Properties props = new Properties(); props.put("bootstrap.servers", > > > > "localhost:4242"); StreamingConfig config = new > StreamingConfig(props); > > > config.subscribe("test-topic-1", > > > > "test-topic-2"); config.processor(ExampleStreamProcessor.class); > > > config.serialization(new > > > > StringSerializer(), new StringDeserializer()); KafkaStreaming > > container = > > > > new KafkaStreaming(config); container.run(); > > > > > > > > KafkaStreaming is basically the SamzaContainer; StreamProcessor is > > > > basically StreamTask. > > > > > > > > So rather than putting all the class names in a file and then having > > the > > > > job assembled by reflection, you just instantiate the container > > > > programmatically. Work is balanced over however many instances of > this > > > are > > > > alive at any time (i.e. if an instance dies, new tasks are added to > the > > > > existing containers without shutting them down). > > > > > > > > We would provide some glue for running this stuff in YARN via Slider, > > > > Mesos via Marathon, and AWS using some of their tools but from the > > point > > > of > > > > view of these frameworks these stream processing jobs are just > > stateless > > > > services that can come and go and expand and contract at will. There > is > > > no > > > > more custom scheduler. > > > > > > > > Here are some relevant details: > > > > > > > > 1. It is only ~1300 lines of code, it would get larger if we > > > > productionized but not vastly larger. We really do get a ton of > > > leverage > > > > out of Kafka. > > > > 2. Partition management is fully delegated to the new consumer. > This > > > > is nice since now any partition management strategy available to > > Kafka > > > > consumer is also available to Samza (and vice versa) and with the > > > exact > > > > same configs. > > > > 3. It supports state as well as state reuse > > > > > > > > Anyhow take a look, hopefully it is thought provoking. > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini < > > criccom...@apache.org> > > > > wrote: > > > > > > > >> Hey all, > > > >> > > > >> I have had some discussions with Samza engineers at LinkedIn and > > > Confluent > > > >> and we came up with a few observations and would like to propose > some > > > >> changes. > > > >> > > > >> We've observed some things that I want to call out about Samza's > > design, > > > >> and I'd like to propose some changes. > > > >> > > > >> * Samza is dependent upon a dynamic deployment system. > > > >> * Samza is too pluggable. > > > >> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs > are > > > >> trying to solve a lot of the same problems. > > > >> > > > >> All three of these issues are related, but I'll address them in > order. > > > >> > > > >> Deployment > > > >> > > > >> Samza strongly depends on the use of a dynamic deployment scheduler > > such > > > >> as > > > >> YARN, Mesos, etc. When we initially built Samza, we bet that there > > would > > > >> be > > > >> one or two winners in this area, and we could support them, and the > > rest > > > >> would go away. In reality, there are many variations. Furthermore, > > many > > > >> people still prefer to just start their processors like normal Java > > > >> processes, and use traditional deployment scripts such as Fabric, > > Chef, > > > >> Ansible, etc. Forcing a deployment system on users makes the Samza > > > >> start-up > > > >> process really painful for first time users. > > > >> > > > >> Dynamic deployment as a requirement was also a bit of a mis-fire > > because > > > >> of > > > >> a fundamental misunderstanding between the nature of batch jobs and > > > stream > > > >> processing jobs. Early on, we made conscious effort to favor the > > Hadoop > > > >> (Map/Reduce) way of doing things, since it worked and was well > > > understood. > > > >> One thing that we missed was that batch jobs have a definite > > beginning, > > > >> and > > > >> end, and stream processing jobs don't (usually). This leads to a > much > > > >> simpler scheduling problem for stream processors. You basically just > > > need > > > >> to find a place to start the processor, and start it. The way we run > > > >> grids, > > > >> at LinkedIn, there's no concept of a cluster being "full". We always > > add > > > >> more machines. The problem with coupling Samza with a scheduler is > > that > > > >> Samza (as a framework) now has to handle deployment. This pulls in a > > > bunch > > > >> of things such as configuration distribution (config stream), shell > > > scrips > > > >> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc. > > > >> > > > >> Another reason for requiring dynamic deployment was to support data > > > >> locality. If you want to have locality, you need to put your > > processors > > > >> close to the data they're processing. Upon further investigation, > > > though, > > > >> this feature is not that beneficial. There is some good discussion > > about > > > >> some problems with it on SAMZA-335. Again, we took the Map/Reduce > > path, > > > >> but > > > >> there are some fundamental differences between HDFS and Kafka. HDFS > > has > > > >> blocks, while Kafka has partitions. This leads to less optimization > > > >> potential with stream processors on top of Kafka. > > > >> > > > >> This feature is also used as a crutch. Samza doesn't have any built > in > > > >> fault-tolerance logic. Instead, it depends on the dynamic deployment > > > >> scheduling system to handle restarts when a processor dies. This has > > > made > > > >> it very difficult to write a standalone Samza container (SAMZA-516). > > > >> > > > >> Pluggability > > > >> > > > >> In some cases pluggability is good, but I think that we've gone too > > far > > > >> with it. Currently, Samza has: > > > >> > > > >> * Pluggable config. > > > >> * Pluggable metrics. > > > >> * Pluggable deployment systems. > > > >> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc). > > > >> * Pluggable serdes. > > > >> * Pluggable storage engines. > > > >> * Pluggable strategies for just about every component > (MessageChooser, > > > >> SystemStreamPartitionGrouper, ConfigRewriter, etc). > > > >> > > > >> There's probably more that I've forgotten, as well. Some of these > are > > > >> useful, but some have proven not to be. This all comes at a cost: > > > >> complexity. This complexity is making it harder for our users to > pick > > up > > > >> and use Samza out of the box. It also makes it difficult for Samza > > > >> developers to reason about what the characteristics of the container > > > >> (since > > > >> the characteristics change depending on which plugins are use). > > > >> > > > >> The issues with pluggability are most visible in the System APIs. > What > > > >> Samza really requires to be functional is Kafka as its transport > > layer. > > > >> But > > > >> we've conflated two unrelated use cases into one API: > > > >> > > > >> 1. Get data into/out of Kafka. > > > >> 2. Process the data in Kafka. > > > >> > > > >> The current System API supports both of these use cases. The problem > > is, > > > >> we > > > >> actually want different features for each use case. By papering over > > > these > > > >> two use cases, and providing a single API, we've introduced a ton of > > > leaky > > > >> abstractions. > > > >> > > > >> For example, what we'd really like in (2) is to have monotonically > > > >> increasing longs for offsets (like Kafka). This would be at odds > with > > > (1), > > > >> though, since different systems have different > > > SCNs/Offsets/UUIDs/vectors. > > > >> There was discussion both on the mailing list and the SQL JIRAs > about > > > the > > > >> need for this. > > > >> > > > >> The same thing holds true for replayability. Kafka allows us to > rewind > > > >> when > > > >> we have a failure. Many other systems don't. In some cases, systems > > > return > > > >> null for their offsets (e.g. WikipediaSystemConsumer) because they > > have > > > no > > > >> offsets. > > > >> > > > >> Partitioning is another example. Kafka supports partitioning, but > many > > > >> systems don't. We model this by having a single partition for those > > > >> systems. Still, other systems model partitioning differently (e.g. > > > >> Kinesis). > > > >> > > > >> The SystemAdmin interface is also a mess. Creating streams in a > > > >> system-agnostic way is almost impossible. As is modeling metadata > for > > > the > > > >> system (replication factor, partitions, location, etc). The list > goes > > > on. > > > >> > > > >> Duplicate work > > > >> > > > >> At the time that we began writing Samza, Kafka's consumer and > producer > > > >> APIs > > > >> had a relatively weak feature set. On the consumer-side, you had two > > > >> options: use the high level consumer, or the simple consumer. The > > > problem > > > >> with the high-level consumer was that it controlled your offsets, > > > >> partition > > > >> assignments, and the order in which you received messages. The > problem > > > >> with > > > >> the simple consumer is that it's not simple. It's basic. You end up > > > having > > > >> to handle a lot of really low-level stuff that you shouldn't. We > > spent a > > > >> lot of time to make Samza's KafkaSystemConsumer very robust. It also > > > >> allows > > > >> us to support some cool features: > > > >> > > > >> * Per-partition message ordering and prioritization. > > > >> * Tight control over partition assignment to support joins, global > > state > > > >> (if we want to implement it :)), etc. > > > >> * Tight control over offset checkpointing. > > > >> > > > >> What we didn't realize at the time is that these features should > > > actually > > > >> be in Kafka. A lot of Kafka consumers (not just Samza stream > > processors) > > > >> end up wanting to do things like joins and partition assignment. The > > > Kafka > > > >> community has come to the same conclusion. They're adding a ton of > > > >> upgrades > > > >> into their new Kafka consumer implementation. To a large extent, > it's > > > >> duplicate work to what we've already done in Samza. > > > >> > > > >> On top of this, Kafka ended up taking a very similar approach to > > Samza's > > > >> KafkaCheckpointManager implementation for handling offset > > checkpointing. > > > >> Like Samza, Kafka's new offset management feature stores offset > > > >> checkpoints > > > >> in a topic, and allows you to fetch them from the broker. > > > >> > > > >> A lot of this seems like a waste, since we could have shared the > work > > if > > > >> it > > > >> had been done in Kafka from the get-go. > > > >> > > > >> Vision > > > >> > > > >> All of this leads me to a rather radical proposal. Samza is > relatively > > > >> stable at this point. I'd venture to say that we're near a 1.0 > > release. > > > >> I'd > > > >> like to propose that we take what we've learned, and begin thinking > > > about > > > >> Samza beyond 1.0. What would we change if we were starting from > > scratch? > > > >> My > > > >> proposal is to: > > > >> > > > >> 1. Make Samza standalone the *only* way to run Samza processors, and > > > >> eliminate all direct dependences on YARN, Mesos, etc. > > > >> 2. Make a definitive call to support only Kafka as the stream > > processing > > > >> layer. > > > >> 3. Eliminate Samza's metrics, logging, serialization, and config > > > systems, > > > >> and simply use Kafka's instead. > > > >> > > > >> This would fix all of the issues that I outlined above. It should > also > > > >> shrink the Samza code base pretty dramatically. Supporting only a > > > >> standalone container will allow Samza to be executed on YARN (using > > > >> Slider), Mesos (using Marathon/Aurora), or most other in-house > > > deployment > > > >> systems. This should make life a lot easier for new users. Imagine > > > having > > > >> the hello-samza tutorial without YARN. The drop in mailing list > > traffic > > > >> will be pretty dramatic. > > > >> > > > >> Coupling with Kafka seems long overdue to me. The reality is, > everyone > > > >> that > > > >> I'm aware of is using Samza with Kafka. We basically require it > > already > > > in > > > >> order for most features to work. Those that are using other systems > > are > > > >> generally using it for ingest into Kafka (1), and then they do the > > > >> processing on top. There is already discussion ( > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 > > > >> ) > > > >> in Kafka to make ingesting into Kafka extremely easy. > > > >> > > > >> Once we make the call to couple with Kafka, we can leverage a ton of > > > their > > > >> ecosystem. We no longer have to maintain our own config, metrics, > etc. > > > We > > > >> can all share the same libraries, and make them better. This will > also > > > >> allow us to share the consumer/producer APIs, and will let us > leverage > > > >> their offset management and partition management, rather than having > > our > > > >> own. All of the coordinator stream code would go away, as would most > > of > > > >> the > > > >> YARN AppMaster code. We'd probably have to push some partition > > > management > > > >> features into the Kafka broker, but they're already moving in that > > > >> direction with the new consumer API. The features we have for > > partition > > > >> assignment aren't unique to Samza, and seem like they should be in > > Kafka > > > >> anyway. There will always be some niche usages which will require > > extra > > > >> care and hence full control over partition assignments much like the > > > Kafka > > > >> low level consumer api. These would continue to be supported. > > > >> > > > >> These items will be good for the Samza community. They'll make Samza > > > >> easier > > > >> to use, and make it easier for developers to add new features. > > > >> > > > >> Obviously this is a fairly large (and somewhat backwards > incompatible > > > >> change). If we choose to go this route, it's important that we > openly > > > >> communicate how we're going to provide a migration path from the > > > existing > > > >> APIs to the new ones (if we make incompatible changes). I think at a > > > >> minimum, we'd probably need to provide a wrapper to allow existing > > > >> StreamTask implementations to continue running on the new container. > > > It's > > > >> also important that we openly communicate about timing, and stages > of > > > the > > > >> migration. > > > >> > > > >> If you made it this far, I'm sure you have opinions. :) Please send > > your > > > >> thoughts and feedback. > > > >> > > > >> Cheers, > > > >> Chris > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >