Thanks Chris/Jay for sending this out. 

I wanted to chime in to reflect some of the motivations from LinkedIn POV.

Late last year we started observing that there is a class of event driven 
applications at LinkedIn which can't leverage some of the cool work that is 
happening in Samza pertaining stream processing (windowing, joins, sql, and off 
course the local state abstraction etc.). The main reason was that these 
applications wanted to embed stream  processing along with their own service 
instead of running their stream processing logic in a shared Samza service.  
Running Samza on their own was not workable for them as operating YARN a big 
road block for them.   Currently such applications end up directly consuming 
using Kafka client API or Databus client API instead .  

The Samza standalone work (Samza-516) that Chris did was a result of this.   
This effort underlined the fact that if we had a deeper relationship with Kafka 
whereby we could leverage the partition distribution functionality in the kafka 
client, then we could avoid a lot of complexity in Samza for supporting stand 
alone mode.  

Jay's prototype helped establish the fact that if we leverage kafka in a deeper 
way the resulting code base for the Samza core would be vastly simplified. 
A deeper relationship where we can align efforts with Kafka is a win-win 
situation for both Kafka and Samza.

There are scenarios where the current state of partition distribution 
capabilities in Kafka are not yet sufficient for 100% of Samza jobs.  An 
example of such jobs are stateful samza jobs where the state is huge.  In 
essence with such jobs, the current behavior of reseeding of state from the 
Kafka changeLog during a simple upgrade is un-workable. This is the main reason 
we focussed on adding host affinity (SAMZA-617) .   The %age of such jobs is 
fairly low.  Hence for many Samza users out there this would not even be a 
problem.   In the new world, there will be a period of transition where Kafka 
doesn't have all the partition distribution capabilities that Samza needs. 
During this time we should be able to use the new core with the existing 
partition distribution and fault tolerance abilities that we get from the 
current Samza YARN-AppMaster.

One of the common questions I get is about the support for other existing 
system-consumers. At LinkedIn, a bigger %age of samza jobs consume events from 
non-kafka system-consumers (Databus).  What we observed was that practically 
all of such applications had a samza job which would consume from Databus and 
write to Kafka and have the rest of the downstream samza jobs consume just from 
Kafka. This was being done to improve testability, better ability to replay 
etc.  etc.  With the ongoing CopyCat discussion in Kafka, the hope is that we 
will get a much better story around ingesting data in Kafka from 100s of 
sources.

The other common question is that if we go down this path then what will happen 
to the existing Samza jobs.   Like Chris mentions, given that the new core is 
going to support all the stream processing functionality of the current core, 
it should (in theory) be simple to have the current SamzaContainer act like a 
wrapper on top of this new  core.  Current applications will hopefully not see 
a significant change.

Overall I am very excited about this proposal. Like all big changes it does 
feel uncomfortable in the beginning, but I feel that if we are able get through 
this then there is a better together story with Kafka waiting for us.

cheers
Kartik 
________________________________________
From: Jay Kreps [j...@confluent.io]
Sent: Tuesday, June 30, 2015 11:33 PM
To: dev@samza.apache.org
Subject: Re: Thoughts and obesrvations on Samza

Looks like gmail mangled the code example, it was supposed to look like
this:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
StreamingConfig config = new StreamingConfig(props);
config.subscribe("test-topic-1", "test-topic-2");
config.processor(ExampleStreamProcessor.class);
config.serialization(new StringSerializer(), new StringDeserializer());
KafkaStreaming container = new KafkaStreaming(config);
container.run();

-Jay

On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote:

> Hey guys,
>
> This came out of some conversations Chris and I were having around whether
> it would make sense to use Samza as a kind of data ingestion framework for
> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of combined
> with complaints around config and YARN and the discussion around how to
> best do a standalone mode.
>
> So the thought experiment was, given that Samza was basically already
> totally Kafka specific, what if you just embraced that and turned it into
> something less like a heavyweight framework and more like a third Kafka
> client--a kind of "producing consumer" with state management facilities.
> Basically a library. Instead of a complex stream processing framework this
> would actually be a very simple thing, not much more complicated to use or
> operate than a Kafka consumer. As Chris said we thought about it a lot of
> what Samza (and the other stream processing systems were doing) seemed like
> kind of a hangover from MapReduce.
>
> Of course you need to ingest/output data to and from the stream
> processing. But when we actually looked into how that would work, Samza
> isn't really an ideal data ingestion framework for a bunch of reasons. To
> really do that right you need a pretty different internal data model and
> set of apis. So what if you split them and had an api for Kafka
> ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> transformation (Samza).
>
> This would also allow really embracing the same terminology and
> conventions. One complaint about the current state is that the two systems
> kind of feel bolted on. Terminology like "stream" vs "topic" and different
> config and monitoring systems means you kind of have to learn Kafka's way,
> then learn Samza's slightly different way, then kind of understand how they
> map to each other, which having walked a few people through this is
> surprisingly tricky for folks to get.
>
> Since I have been spending a lot of time on airplanes I hacked up an
> ernest but still somewhat incomplete prototype of what this would look
> like. This is just unceremoniously dumped into Kafka as it required a few
> changes to the new consumer. Here is the code:
>
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org/apache/kafka/clients/streaming
>
> For the purpose of the prototype I just liberally renamed everything to
> try to align it with Kafka with no regard for compatibility.
>
> To use this would be something like this:
> Properties props = new Properties(); props.put("bootstrap.servers",
> "localhost:4242"); StreamingConfig config = new StreamingConfig(props); 
> config.subscribe("test-topic-1",
> "test-topic-2"); config.processor(ExampleStreamProcessor.class); 
> config.serialization(new
> StringSerializer(), new StringDeserializer()); KafkaStreaming container =
> new KafkaStreaming(config); container.run();
>
> KafkaStreaming is basically the SamzaContainer; StreamProcessor is
> basically StreamTask.
>
> So rather than putting all the class names in a file and then having the
> job assembled by reflection, you just instantiate the container
> programmatically. Work is balanced over however many instances of this are
> alive at any time (i.e. if an instance dies, new tasks are added to the
> existing containers without shutting them down).
>
> We would provide some glue for running this stuff in YARN via Slider,
> Mesos via Marathon, and AWS using some of their tools but from the point of
> view of these frameworks these stream processing jobs are just stateless
> services that can come and go and expand and contract at will. There is no
> more custom scheduler.
>
> Here are some relevant details:
>
>    1. It is only ~1300 lines of code, it would get larger if we
>    productionized but not vastly larger. We really do get a ton of leverage
>    out of Kafka.
>    2. Partition management is fully delegated to the new consumer. This
>    is nice since now any partition management strategy available to Kafka
>    consumer is also available to Samza (and vice versa) and with the exact
>    same configs.
>    3. It supports state as well as state reuse
>
> Anyhow take a look, hopefully it is thought provoking.
>
> -Jay
>
>
>
> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <criccom...@apache.org>
> wrote:
>
>> Hey all,
>>
>> I have had some discussions with Samza engineers at LinkedIn and Confluent
>> and we came up with a few observations and would like to propose some
>> changes.
>>
>> We've observed some things that I want to call out about Samza's design,
>> and I'd like to propose some changes.
>>
>> * Samza is dependent upon a dynamic deployment system.
>> * Samza is too pluggable.
>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer APIs are
>> trying to solve a lot of the same problems.
>>
>> All three of these issues are related, but I'll address them in order.
>>
>> Deployment
>>
>> Samza strongly depends on the use of a dynamic deployment scheduler such
>> as
>> YARN, Mesos, etc. When we initially built Samza, we bet that there would
>> be
>> one or two winners in this area, and we could support them, and the rest
>> would go away. In reality, there are many variations. Furthermore, many
>> people still prefer to just start their processors like normal Java
>> processes, and use traditional deployment scripts such as Fabric, Chef,
>> Ansible, etc. Forcing a deployment system on users makes the Samza
>> start-up
>> process really painful for first time users.
>>
>> Dynamic deployment as a requirement was also a bit of a mis-fire because
>> of
>> a fundamental misunderstanding between the nature of batch jobs and stream
>> processing jobs. Early on, we made conscious effort to favor the Hadoop
>> (Map/Reduce) way of doing things, since it worked and was well understood.
>> One thing that we missed was that batch jobs have a definite beginning,
>> and
>> end, and stream processing jobs don't (usually). This leads to a much
>> simpler scheduling problem for stream processors. You basically just need
>> to find a place to start the processor, and start it. The way we run
>> grids,
>> at LinkedIn, there's no concept of a cluster being "full". We always add
>> more machines. The problem with coupling Samza with a scheduler is that
>> Samza (as a framework) now has to handle deployment. This pulls in a bunch
>> of things such as configuration distribution (config stream), shell scrips
>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc.
>>
>> Another reason for requiring dynamic deployment was to support data
>> locality. If you want to have locality, you need to put your processors
>> close to the data they're processing. Upon further investigation, though,
>> this feature is not that beneficial. There is some good discussion about
>> some problems with it on SAMZA-335. Again, we took the Map/Reduce path,
>> but
>> there are some fundamental differences between HDFS and Kafka. HDFS has
>> blocks, while Kafka has partitions. This leads to less optimization
>> potential with stream processors on top of Kafka.
>>
>> This feature is also used as a crutch. Samza doesn't have any built in
>> fault-tolerance logic. Instead, it depends on the dynamic deployment
>> scheduling system to handle restarts when a processor dies. This has made
>> it very difficult to write a standalone Samza container (SAMZA-516).
>>
>> Pluggability
>>
>> In some cases pluggability is good, but I think that we've gone too far
>> with it. Currently, Samza has:
>>
>> * Pluggable config.
>> * Pluggable metrics.
>> * Pluggable deployment systems.
>> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc).
>> * Pluggable serdes.
>> * Pluggable storage engines.
>> * Pluggable strategies for just about every component (MessageChooser,
>> SystemStreamPartitionGrouper, ConfigRewriter, etc).
>>
>> There's probably more that I've forgotten, as well. Some of these are
>> useful, but some have proven not to be. This all comes at a cost:
>> complexity. This complexity is making it harder for our users to pick up
>> and use Samza out of the box. It also makes it difficult for Samza
>> developers to reason about what the characteristics of the container
>> (since
>> the characteristics change depending on which plugins are use).
>>
>> The issues with pluggability are most visible in the System APIs. What
>> Samza really requires to be functional is Kafka as its transport layer.
>> But
>> we've conflated two unrelated use cases into one API:
>>
>> 1. Get data into/out of Kafka.
>> 2. Process the data in Kafka.
>>
>> The current System API supports both of these use cases. The problem is,
>> we
>> actually want different features for each use case. By papering over these
>> two use cases, and providing a single API, we've introduced a ton of leaky
>> abstractions.
>>
>> For example, what we'd really like in (2) is to have monotonically
>> increasing longs for offsets (like Kafka). This would be at odds with (1),
>> though, since different systems have different SCNs/Offsets/UUIDs/vectors.
>> There was discussion both on the mailing list and the SQL JIRAs about the
>> need for this.
>>
>> The same thing holds true for replayability. Kafka allows us to rewind
>> when
>> we have a failure. Many other systems don't. In some cases, systems return
>> null for their offsets (e.g. WikipediaSystemConsumer) because they have no
>> offsets.
>>
>> Partitioning is another example. Kafka supports partitioning, but many
>> systems don't. We model this by having a single partition for those
>> systems. Still, other systems model partitioning differently (e.g.
>> Kinesis).
>>
>> The SystemAdmin interface is also a mess. Creating streams in a
>> system-agnostic way is almost impossible. As is modeling metadata for the
>> system (replication factor, partitions, location, etc). The list goes on.
>>
>> Duplicate work
>>
>> At the time that we began writing Samza, Kafka's consumer and producer
>> APIs
>> had a relatively weak feature set. On the consumer-side, you had two
>> options: use the high level consumer, or the simple consumer. The problem
>> with the high-level consumer was that it controlled your offsets,
>> partition
>> assignments, and the order in which you received messages. The problem
>> with
>> the simple consumer is that it's not simple. It's basic. You end up having
>> to handle a lot of really low-level stuff that you shouldn't. We spent a
>> lot of time to make Samza's KafkaSystemConsumer very robust. It also
>> allows
>> us to support some cool features:
>>
>> * Per-partition message ordering and prioritization.
>> * Tight control over partition assignment to support joins, global state
>> (if we want to implement it :)), etc.
>> * Tight control over offset checkpointing.
>>
>> What we didn't realize at the time is that these features should actually
>> be in Kafka. A lot of Kafka consumers (not just Samza stream processors)
>> end up wanting to do things like joins and partition assignment. The Kafka
>> community has come to the same conclusion. They're adding a ton of
>> upgrades
>> into their new Kafka consumer implementation. To a large extent, it's
>> duplicate work to what we've already done in Samza.
>>
>> On top of this, Kafka ended up taking a very similar approach to Samza's
>> KafkaCheckpointManager implementation for handling offset checkpointing.
>> Like Samza, Kafka's new offset management feature stores offset
>> checkpoints
>> in a topic, and allows you to fetch them from the broker.
>>
>> A lot of this seems like a waste, since we could have shared the work if
>> it
>> had been done in Kafka from the get-go.
>>
>> Vision
>>
>> All of this leads me to a rather radical proposal. Samza is relatively
>> stable at this point. I'd venture to say that we're near a 1.0 release.
>> I'd
>> like to propose that we take what we've learned, and begin thinking about
>> Samza beyond 1.0. What would we change if we were starting from scratch?
>> My
>> proposal is to:
>>
>> 1. Make Samza standalone the *only* way to run Samza processors, and
>> eliminate all direct dependences on YARN, Mesos, etc.
>> 2. Make a definitive call to support only Kafka as the stream processing
>> layer.
>> 3. Eliminate Samza's metrics, logging, serialization, and config systems,
>> and simply use Kafka's instead.
>>
>> This would fix all of the issues that I outlined above. It should also
>> shrink the Samza code base pretty dramatically. Supporting only a
>> standalone container will allow Samza to be executed on YARN (using
>> Slider), Mesos (using Marathon/Aurora), or most other in-house deployment
>> systems. This should make life a lot easier for new users. Imagine having
>> the hello-samza tutorial without YARN. The drop in mailing list traffic
>> will be pretty dramatic.
>>
>> Coupling with Kafka seems long overdue to me. The reality is, everyone
>> that
>> I'm aware of is using Samza with Kafka. We basically require it already in
>> order for most features to work. Those that are using other systems are
>> generally using it for ingest into Kafka (1), and then they do the
>> processing on top. There is already discussion (
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851767
>> )
>> in Kafka to make ingesting into Kafka extremely easy.
>>
>> Once we make the call to couple with Kafka, we can leverage a ton of their
>> ecosystem. We no longer have to maintain our own config, metrics, etc. We
>> can all share the same libraries, and make them better. This will also
>> allow us to share the consumer/producer APIs, and will let us leverage
>> their offset management and partition management, rather than having our
>> own. All of the coordinator stream code would go away, as would most of
>> the
>> YARN AppMaster code. We'd probably have to push some partition management
>> features into the Kafka broker, but they're already moving in that
>> direction with the new consumer API. The features we have for partition
>> assignment aren't unique to Samza, and seem like they should be in Kafka
>> anyway. There will always be some niche usages which will require extra
>> care and hence full control over partition assignments much like the Kafka
>> low level consumer api. These would continue to be supported.
>>
>> These items will be good for the Samza community. They'll make Samza
>> easier
>> to use, and make it easier for developers to add new features.
>>
>> Obviously this is a fairly large (and somewhat backwards incompatible
>> change). If we choose to go this route, it's important that we openly
>> communicate how we're going to provide a migration path from the existing
>> APIs to the new ones (if we make incompatible changes). I think at a
>> minimum, we'd probably need to provide a wrapper to allow existing
>> StreamTask implementations to continue running on the new container. It's
>> also important that we openly communicate about timing, and stages of the
>> migration.
>>
>> If you made it this far, I'm sure you have opinions. :) Please send your
>> thoughts and feedback.
>>
>> Cheers,
>> Chris
>>
>
>

Reply via email to