Looks like gmail mangled the code example, it was supposed to look like
this:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
StreamingConfig config = new StreamingConfig(props);
config.subscribe("test-topic-1", "test-topic-2");
config.processor(ExampleStreamProcessor.class);
config.serialization(new StringSerializer(), new StringDeserializer());
KafkaStreaming container = new KafkaStreaming(config);
container.run();

-Jay

On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote:

> Hey guys,
>
> This came out of some conversations Chris and I were having around whether
> it would make sense to use Samza as a kind of data ingestion framework for
> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of combined
> with complaints around config and YARN and the discussion around how to
> best do a standalone mode.
>
> So the thought experiment was, given that Samza was basically already
> totally Kafka specific, what if you just embraced that and turned it into
> something less like a heavyweight framework and more like a third Kafka
> client--a kind of "producing consumer" with state management facilities.
> Basically a library. Instead of a complex stream processing framework this
> would actually be a very simple thing, not much more complicated to use or
> operate than a Kafka consumer. As Chris said we thought about it a lot of
> what Samza (and the other stream processing systems were doing) seemed like
> kind of a hangover from MapReduce.
>
> Of course you need to ingest/output data to and from the stream
> processing. But when we actually looked into how that would work, Samza
> isn't really an ideal data ingestion framework for a bunch of reasons. To
> really do that right you need a pretty different internal data model and
> set of apis. So what if you split them and had an api for Kafka
> ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> transformation (Samza).
>
> This would also allow really embracing the same terminology and
> conventions. One complaint about the current state is that the two systems
> kind of feel bolted on. Terminology like "stream" vs "topic" and different
> config and monitoring systems means you kind of have to learn Kafka's way,
> then learn Samza's slightly different way, then kind of understand how they
> map to each other, which having walked a few people through this is
> surprisingly tricky for folks to get.
>
> Since I have been spending a lot of time on airplanes I hacked up an
> ernest but still somewhat incomplete prototype of what this would look
> like. This is just unceremoniously dumped into Kafka as it required a few
> changes to the new consumer. Here is the code:
>
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming
>
> For the purpose of the prototype I just liberally renamed everything to
> try to align it with Kafka with no regard for compatibility.
>
> To use this would be something like this:
> Properties props = new Properties(); props.put("bootstrap.servers",
> "localhost:4242"); StreamingConfig config = new StreamingConfig(props); 
> config.subscribe("test-topic-1",
> "test-topic-2"); config.processor(ExampleStreamProcessor.class); 
> config.serialization(new
> StringSerializer(), new StringDeserializer()); KafkaStreaming container =
> new KafkaStreaming(config); container.run();
>
> KafkaStreaming is basically the SamzaContainer; StreamProcessor is
> basically StreamTask.
>
> So rather than putting all the class names in a file and then having the
> job assembled by reflection, you just instantiate the container
> programmatically. Work is balanced over however many instances of this are
> alive at any time (i.e. if an instance dies, new tasks are added to the
> existing containers without shutting them down).
>
> We would provide some glue for running this stuff in YARN via Slider,
> Mesos via Marathon, and AWS using some of their tools but from the point of
> view of these frameworks these stream processing jobs are just stateless
> services that can come and go and expand and contract at will. There is no
> more custom scheduler.
>
> Here are some relevant details:
>
>    1. It is only ~1300 lines of code, it would get larger if we
>    productionized but not vastly larger. We really do get a ton of leverage
>    out of Kafka.
>    2. Partition management is fully delegated to the new consumer. This
>    is nice since now any partition management strategy available to Kafka
>    consumer is also available to Samza (and vice versa) and with the exact
>    same configs.
>    3. It supports state as well as state reuse
>
> Anyhow take a look, hopefully it is thought provoking.
>
> -Jay
>
>
>
> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
>> Hey all,
>>
>> I have had some discussions with Samza engineers at LinkedIn and Confluent
>> and we came up with a few observations and would like to propose some
>> changes.
>>
>> We've observed some things that I want to call out about Samza's design,
>> and I'd like to propose some changes.
>>
>> * Samza is dependent upon a dynamic deployment system.
>> * Samza is too pluggable.
>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs are
>> trying to solve a lot of the same problems.
>>
>> All three of these issues are related, but I'll address them in order.
>>
>> Deployment
>>
>> Samza strongly depends on the use of a dynamic deployment scheduler such
>> as
>> YARN, Mesos, etc. When we initially built Samza, we bet that there would
>> be
>> one or two winners in this area, and we could support them, and the rest
>> would go away. In reality, there are many variations. Furthermore, many
>> people still prefer to just start their processors like normal Java
>> processes, and use traditional deployment scripts such as Fabric, Chef,
>> Ansible, etc. Forcing a deployment system on users makes the Samza
>> start-up
>> process really painful for first time users.
>>
>> Dynamic deployment as a requirement was also a bit of a mis-fire because
>> of
>> a fundamental misunderstanding between the nature of batch jobs and stream
>> processing jobs. Early on, we made conscious effort to favor the Hadoop
>> (Map/Reduce) way of doing things, since it worked and was well understood.
>> One thing that we missed was that batch jobs have a definite beginning,
>> and
>> end, and stream processing jobs don't (usually). This leads to a much
>> simpler scheduling problem for stream processors. You basically just need
>> to find a place to start the processor, and start it. The way we run
>> grids,
>> at LinkedIn, there's no concept of a cluster being "full". We always add
>> more machines. The problem with coupling Samza with a scheduler is that
>> Samza (as a framework) now has to handle deployment. This pulls in a bunch
>> of things such as configuration distribution (config stream), shell scrips
>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc.
>>
>> Another reason for requiring dynamic deployment was to support data
>> locality. If you want to have locality, you need to put your processors
>> close to the data they're processing. Upon further investigation, though,
>> this feature is not that beneficial. There is some good discussion about
>> some problems with it on SAMZA-335. Again, we took the Map/Reduce path,
>> but
>> there are some fundamental differences between HDFS and Kafka. HDFS has
>> blocks, while Kafka has partitions. This leads to less optimization
>> potential with stream processors on top of Kafka.
>>
>> This feature is also used as a crutch. Samza doesn't have any built in
>> fault-tolerance logic. Instead, it depends on the dynamic deployment
>> scheduling system to handle restarts when a processor dies. This has made
>> it very difficult to write a standalone Samza container (SAMZA-516).
>>
>> Pluggability
>>
>> In some cases pluggability is good, but I think that we've gone too far
>> with it. Currently, Samza has:
>>
>> * Pluggable config.
>> * Pluggable metrics.
>> * Pluggable deployment systems.
>> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc).
>> * Pluggable serdes.
>> * Pluggable storage engines.
>> * Pluggable strategies for just about every component (MessageChooser,
>> SystemStreamPartitionGrouper, ConfigRewriter, etc).
>>
>> There's probably more that I've forgotten, as well. Some of these are
>> useful, but some have proven not to be. This all comes at a cost:
>> complexity. This complexity is making it harder for our users to pick up
>> and use Samza out of the box. It also makes it difficult for Samza
>> developers to reason about what the characteristics of the container
>> (since
>> the characteristics change depending on which plugins are use).
>>
>> The issues with pluggability are most visible in the System APIs. What
>> Samza really requires to be functional is Kafka as its transport layer.
>> But
>> we've conflated two unrelated use cases into one API:
>>
>> 1. Get data into/out of Kafka.
>> 2. Process the data in Kafka.
>>
>> The current System API supports both of these use cases. The problem is,
>> we
>> actually want different features for each use case. By papering over these
>> two use cases, and providing a single API, we've introduced a ton of leaky
>> abstractions.
>>
>> For example, what we'd really like in (2) is to have monotonically
>> increasing longs for offsets (like Kafka). This would be at odds with (1),
>> though, since different systems have different SCNs/Offsets/UUIDs/vectors.
>> There was discussion both on the mailing list and the SQL JIRAs about the
>> need for this.
>>
>> The same thing holds true for replayability. Kafka allows us to rewind
>> when
>> we have a failure. Many other systems don't. In some cases, systems return
>> null for their offsets (e.g. WikipediaSystemConsumer) because they have no
>> offsets.
>>
>> Partitioning is another example. Kafka supports partitioning, but many
>> systems don't. We model this by having a single partition for those
>> systems. Still, other systems model partitioning differently (e.g.
>> Kinesis).
>>
>> The SystemAdmin interface is also a mess. Creating streams in a
>> system-agnostic way is almost impossible. As is modeling metadata for the
>> system (replication factor, partitions, location, etc). The list goes on.
>>
>> Duplicate work
>>
>> At the time that we began writing Samza, Kafka's consumer and producer
>> APIs
>> had a relatively weak feature set. On the consumer-side, you had two
>> options: use the high level consumer, or the simple consumer. The problem
>> with the high-level consumer was that it controlled your offsets,
>> partition
>> assignments, and the order in which you received messages. The problem
>> with
>> the simple consumer is that it's not simple. It's basic. You end up having
>> to handle a lot of really low-level stuff that you shouldn't. We spent a
>> lot of time to make Samza's KafkaSystemConsumer very robust. It also
>> allows
>> us to support some cool features:
>>
>> * Per-partition message ordering and prioritization.
>> * Tight control over partition assignment to support joins, global state
>> (if we want to implement it :)), etc.
>> * Tight control over offset checkpointing.
>>
>> What we didn't realize at the time is that these features should actually
>> be in Kafka. A lot of Kafka consumers (not just Samza stream processors)
>> end up wanting to do things like joins and partition assignment. The Kafka
>> community has come to the same conclusion. They're adding a ton of
>> upgrades
>> into their new Kafka consumer implementation. To a large extent, it's
>> duplicate work to what we've already done in Samza.
>>
>> On top of this, Kafka ended up taking a very similar approach to Samza's
>> KafkaCheckpointManager implementation for handling offset checkpointing.
>> Like Samza, Kafka's new offset management feature stores offset
>> checkpoints
>> in a topic, and allows you to fetch them from the broker.
>>
>> A lot of this seems like a waste, since we could have shared the work if
>> it
>> had been done in Kafka from the get-go.
>>
>> Vision
>>
>> All of this leads me to a rather radical proposal. Samza is relatively
>> stable at this point. I'd venture to say that we're near a 1.0 release.
>> I'd
>> like to propose that we take what we've learned, and begin thinking about
>> Samza beyond 1.0. What would we change if we were starting from scratch?
>> My
>> proposal is to:
>>
>> 1. Make Samza standalone the *only* way to run Samza processors, and
>> eliminate all direct dependences on YARN, Mesos, etc.
>> 2. Make a definitive call to support only Kafka as the stream processing
>> layer.
>> 3. Eliminate Samza's metrics, logging, serialization, and config systems,
>> and simply use Kafka's instead.
>>
>> This would fix all of the issues that I outlined above. It should also
>> shrink the Samza code base pretty dramatically. Supporting only a
>> standalone container will allow Samza to be executed on YARN (using
>> Slider), Mesos (using Marathon/Aurora), or most other in-house deployment
>> systems. This should make life a lot easier for new users. Imagine having
>> the hello-samza tutorial without YARN. The drop in mailing list traffic
>> will be pretty dramatic.
>>
>> Coupling with Kafka seems long overdue to me. The reality is, everyone
>> that
>> I'm aware of is using Samza with Kafka. We basically require it already in
>> order for most features to work. Those that are using other systems are
>> generally using it for ingest into Kafka (1), and then they do the
>> processing on top. There is already discussion (
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767
>> )
>> in Kafka to make ingesting into Kafka extremely easy.
>>
>> Once we make the call to couple with Kafka, we can leverage a ton of their
>> ecosystem. We no longer have to maintain our own config, metrics, etc. We
>> can all share the same libraries, and make them better. This will also
>> allow us to share the consumer/producer APIs, and will let us leverage
>> their offset management and partition management, rather than having our
>> own. All of the coordinator stream code would go away, as would most of
>> the
>> YARN AppMaster code. We'd probably have to push some partition management
>> features into the Kafka broker, but they're already moving in that
>> direction with the new consumer API. The features we have for partition
>> assignment aren't unique to Samza, and seem like they should be in Kafka
>> anyway. There will always be some niche usages which will require extra
>> care and hence full control over partition assignments much like the Kafka
>> low level consumer api. These would continue to be supported.
>>
>> These items will be good for the Samza community. They'll make Samza
>> easier
>> to use, and make it easier for developers to add new features.
>>
>> Obviously this is a fairly large (and somewhat backwards incompatible
>> change). If we choose to go this route, it's important that we openly
>> communicate how we're going to provide a migration path from the existing
>> APIs to the new ones (if we make incompatible changes). I think at a
>> minimum, we'd probably need to provide a wrapper to allow existing
>> StreamTask implementations to continue running on the new container. It's
>> also important that we openly communicate about timing, and stages of the
>> migration.
>>
>> If you made it this far, I'm sure you have opinions. :) Please send your
>> thoughts and feedback.
>>
>> Cheers,
>> Chris
>>
>
>

Reply via email to