Looks like gmail mangled the code example, it was supposed to look like this:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:4242"); StreamingConfig config = new StreamingConfig(props); config.subscribe("test-topic-1", "test-topic-2"); config.processor(ExampleStreamProcessor.class); config.serialization(new StringSerializer(), new StringDeserializer()); KafkaStreaming container = new KafkaStreaming(config); container.run(); -Jay On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote: > Hey guys, > > This came out of some conversations Chris and I were having around whether > it would make sense to use Samza as a kind of data ingestion framework for > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of combined > with complaints around config and YARN and the discussion around how to > best do a standalone mode. > > So the thought experiment was, given that Samza was basically already > totally Kafka specific, what if you just embraced that and turned it into > something less like a heavyweight framework and more like a third Kafka > client--a kind of "producing consumer" with state management facilities. > Basically a library. Instead of a complex stream processing framework this > would actually be a very simple thing, not much more complicated to use or > operate than a Kafka consumer. As Chris said we thought about it a lot of > what Samza (and the other stream processing systems were doing) seemed like > kind of a hangover from MapReduce. > > Of course you need to ingest/output data to and from the stream > processing. But when we actually looked into how that would work, Samza > isn't really an ideal data ingestion framework for a bunch of reasons. To > really do that right you need a pretty different internal data model and > set of apis. So what if you split them and had an api for Kafka > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka > transformation (Samza). > > This would also allow really embracing the same terminology and > conventions. One complaint about the current state is that the two systems > kind of feel bolted on. Terminology like "stream" vs "topic" and different > config and monitoring systems means you kind of have to learn Kafka's way, > then learn Samza's slightly different way, then kind of understand how they > map to each other, which having walked a few people through this is > surprisingly tricky for folks to get. > > Since I have been spending a lot of time on airplanes I hacked up an > ernest but still somewhat incomplete prototype of what this would look > like. This is just unceremoniously dumped into Kafka as it required a few > changes to the new consumer. Here is the code: > > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming > > For the purpose of the prototype I just liberally renamed everything to > try to align it with Kafka with no regard for compatibility. > > To use this would be something like this: > Properties props = new Properties(); props.put("bootstrap.servers", > "localhost:4242"); StreamingConfig config = new StreamingConfig(props); > config.subscribe("test-topic-1", > "test-topic-2"); config.processor(ExampleStreamProcessor.class); > config.serialization(new > StringSerializer(), new StringDeserializer()); KafkaStreaming container = > new KafkaStreaming(config); container.run(); > > KafkaStreaming is basically the SamzaContainer; StreamProcessor is > basically StreamTask. > > So rather than putting all the class names in a file and then having the > job assembled by reflection, you just instantiate the container > programmatically. Work is balanced over however many instances of this are > alive at any time (i.e. if an instance dies, new tasks are added to the > existing containers without shutting them down). > > We would provide some glue for running this stuff in YARN via Slider, > Mesos via Marathon, and AWS using some of their tools but from the point of > view of these frameworks these stream processing jobs are just stateless > services that can come and go and expand and contract at will. There is no > more custom scheduler. > > Here are some relevant details: > > 1. It is only ~1300 lines of code, it would get larger if we > productionized but not vastly larger. We really do get a ton of leverage > out of Kafka. > 2. Partition management is fully delegated to the new consumer. This > is nice since now any partition management strategy available to Kafka > consumer is also available to Samza (and vice versa) and with the exact > same configs. > 3. It supports state as well as state reuse > > Anyhow take a look, hopefully it is thought provoking. > > -Jay > > > > On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <criccom...@apache.org> > wrote: > >> Hey all, >> >> I have had some discussions with Samza engineers at LinkedIn and Confluent >> and we came up with a few observations and would like to propose some >> changes. >> >> We've observed some things that I want to call out about Samza's design, >> and I'd like to propose some changes. >> >> * Samza is dependent upon a dynamic deployment system. >> * Samza is too pluggable. >> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs are >> trying to solve a lot of the same problems. >> >> All three of these issues are related, but I'll address them in order. >> >> Deployment >> >> Samza strongly depends on the use of a dynamic deployment scheduler such >> as >> YARN, Mesos, etc. When we initially built Samza, we bet that there would >> be >> one or two winners in this area, and we could support them, and the rest >> would go away. In reality, there are many variations. Furthermore, many >> people still prefer to just start their processors like normal Java >> processes, and use traditional deployment scripts such as Fabric, Chef, >> Ansible, etc. Forcing a deployment system on users makes the Samza >> start-up >> process really painful for first time users. >> >> Dynamic deployment as a requirement was also a bit of a mis-fire because >> of >> a fundamental misunderstanding between the nature of batch jobs and stream >> processing jobs. Early on, we made conscious effort to favor the Hadoop >> (Map/Reduce) way of doing things, since it worked and was well understood. >> One thing that we missed was that batch jobs have a definite beginning, >> and >> end, and stream processing jobs don't (usually). This leads to a much >> simpler scheduling problem for stream processors. You basically just need >> to find a place to start the processor, and start it. The way we run >> grids, >> at LinkedIn, there's no concept of a cluster being "full". We always add >> more machines. The problem with coupling Samza with a scheduler is that >> Samza (as a framework) now has to handle deployment. This pulls in a bunch >> of things such as configuration distribution (config stream), shell scrips >> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc. >> >> Another reason for requiring dynamic deployment was to support data >> locality. If you want to have locality, you need to put your processors >> close to the data they're processing. Upon further investigation, though, >> this feature is not that beneficial. There is some good discussion about >> some problems with it on SAMZA-335. Again, we took the Map/Reduce path, >> but >> there are some fundamental differences between HDFS and Kafka. HDFS has >> blocks, while Kafka has partitions. This leads to less optimization >> potential with stream processors on top of Kafka. >> >> This feature is also used as a crutch. Samza doesn't have any built in >> fault-tolerance logic. Instead, it depends on the dynamic deployment >> scheduling system to handle restarts when a processor dies. This has made >> it very difficult to write a standalone Samza container (SAMZA-516). >> >> Pluggability >> >> In some cases pluggability is good, but I think that we've gone too far >> with it. Currently, Samza has: >> >> * Pluggable config. >> * Pluggable metrics. >> * Pluggable deployment systems. >> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc). >> * Pluggable serdes. >> * Pluggable storage engines. >> * Pluggable strategies for just about every component (MessageChooser, >> SystemStreamPartitionGrouper, ConfigRewriter, etc). >> >> There's probably more that I've forgotten, as well. Some of these are >> useful, but some have proven not to be. This all comes at a cost: >> complexity. This complexity is making it harder for our users to pick up >> and use Samza out of the box. It also makes it difficult for Samza >> developers to reason about what the characteristics of the container >> (since >> the characteristics change depending on which plugins are use). >> >> The issues with pluggability are most visible in the System APIs. What >> Samza really requires to be functional is Kafka as its transport layer. >> But >> we've conflated two unrelated use cases into one API: >> >> 1. Get data into/out of Kafka. >> 2. Process the data in Kafka. >> >> The current System API supports both of these use cases. The problem is, >> we >> actually want different features for each use case. By papering over these >> two use cases, and providing a single API, we've introduced a ton of leaky >> abstractions. >> >> For example, what we'd really like in (2) is to have monotonically >> increasing longs for offsets (like Kafka). This would be at odds with (1), >> though, since different systems have different SCNs/Offsets/UUIDs/vectors. >> There was discussion both on the mailing list and the SQL JIRAs about the >> need for this. >> >> The same thing holds true for replayability. Kafka allows us to rewind >> when >> we have a failure. Many other systems don't. In some cases, systems return >> null for their offsets (e.g. WikipediaSystemConsumer) because they have no >> offsets. >> >> Partitioning is another example. Kafka supports partitioning, but many >> systems don't. We model this by having a single partition for those >> systems. Still, other systems model partitioning differently (e.g. >> Kinesis). >> >> The SystemAdmin interface is also a mess. Creating streams in a >> system-agnostic way is almost impossible. As is modeling metadata for the >> system (replication factor, partitions, location, etc). The list goes on. >> >> Duplicate work >> >> At the time that we began writing Samza, Kafka's consumer and producer >> APIs >> had a relatively weak feature set. On the consumer-side, you had two >> options: use the high level consumer, or the simple consumer. The problem >> with the high-level consumer was that it controlled your offsets, >> partition >> assignments, and the order in which you received messages. The problem >> with >> the simple consumer is that it's not simple. It's basic. You end up having >> to handle a lot of really low-level stuff that you shouldn't. We spent a >> lot of time to make Samza's KafkaSystemConsumer very robust. It also >> allows >> us to support some cool features: >> >> * Per-partition message ordering and prioritization. >> * Tight control over partition assignment to support joins, global state >> (if we want to implement it :)), etc. >> * Tight control over offset checkpointing. >> >> What we didn't realize at the time is that these features should actually >> be in Kafka. A lot of Kafka consumers (not just Samza stream processors) >> end up wanting to do things like joins and partition assignment. The Kafka >> community has come to the same conclusion. They're adding a ton of >> upgrades >> into their new Kafka consumer implementation. To a large extent, it's >> duplicate work to what we've already done in Samza. >> >> On top of this, Kafka ended up taking a very similar approach to Samza's >> KafkaCheckpointManager implementation for handling offset checkpointing. >> Like Samza, Kafka's new offset management feature stores offset >> checkpoints >> in a topic, and allows you to fetch them from the broker. >> >> A lot of this seems like a waste, since we could have shared the work if >> it >> had been done in Kafka from the get-go. >> >> Vision >> >> All of this leads me to a rather radical proposal. Samza is relatively >> stable at this point. I'd venture to say that we're near a 1.0 release. >> I'd >> like to propose that we take what we've learned, and begin thinking about >> Samza beyond 1.0. What would we change if we were starting from scratch? >> My >> proposal is to: >> >> 1. Make Samza standalone the *only* way to run Samza processors, and >> eliminate all direct dependences on YARN, Mesos, etc. >> 2. Make a definitive call to support only Kafka as the stream processing >> layer. >> 3. Eliminate Samza's metrics, logging, serialization, and config systems, >> and simply use Kafka's instead. >> >> This would fix all of the issues that I outlined above. It should also >> shrink the Samza code base pretty dramatically. Supporting only a >> standalone container will allow Samza to be executed on YARN (using >> Slider), Mesos (using Marathon/Aurora), or most other in-house deployment >> systems. This should make life a lot easier for new users. Imagine having >> the hello-samza tutorial without YARN. The drop in mailing list traffic >> will be pretty dramatic. >> >> Coupling with Kafka seems long overdue to me. The reality is, everyone >> that >> I'm aware of is using Samza with Kafka. We basically require it already in >> order for most features to work. Those that are using other systems are >> generally using it for ingest into Kafka (1), and then they do the >> processing on top. There is already discussion ( >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767 >> ) >> in Kafka to make ingesting into Kafka extremely easy. >> >> Once we make the call to couple with Kafka, we can leverage a ton of their >> ecosystem. We no longer have to maintain our own config, metrics, etc. We >> can all share the same libraries, and make them better. This will also >> allow us to share the consumer/producer APIs, and will let us leverage >> their offset management and partition management, rather than having our >> own. All of the coordinator stream code would go away, as would most of >> the >> YARN AppMaster code. We'd probably have to push some partition management >> features into the Kafka broker, but they're already moving in that >> direction with the new consumer API. The features we have for partition >> assignment aren't unique to Samza, and seem like they should be in Kafka >> anyway. There will always be some niche usages which will require extra >> care and hence full control over partition assignments much like the Kafka >> low level consumer api. These would continue to be supported. >> >> These items will be good for the Samza community. They'll make Samza >> easier >> to use, and make it easier for developers to add new features. >> >> Obviously this is a fairly large (and somewhat backwards incompatible >> change). If we choose to go this route, it's important that we openly >> communicate how we're going to provide a migration path from the existing >> APIs to the new ones (if we make incompatible changes). I think at a >> minimum, we'd probably need to provide a wrapper to allow existing >> StreamTask implementations to continue running on the new container. It's >> also important that we openly communicate about timing, and stages of the >> migration. >> >> If you made it this far, I'm sure you have opinions. :) Please send your >> thoughts and feedback. >> >> Cheers, >> Chris >> > >