Hi all,

I think the question below re does Samza become a sub-project of Kafka 
highlights the broader point around migration. Chris mentions Samza's maturity 
is heading towards a v1 release but I'm not sure it feels right to launch a v1 
then immediately plan to deprecate most of it.

From a selfish perspective I have some guys who have started working with Samza 
and building some new consumers/producers was next up. Sounds like that is 
absolutely not the direction to go. I need to look into the KIP in more detail 
but for me the attractiveness of adding new Samza consumer/producers -- even if 
yes all they were doing was really getting data into and out of Kafka --  was 
to avoid  having to worry about the lifecycle management of external clients. 
If there is a generic Kafka ingress/egress layer that I can plug a new 
connector into and have a lot of the heavy lifting re scale and reliability 
done for me then it gives me all the pushing new consumers/producers would. If 
not then it complicates my operational deployments.

Which is similar to my other question with the proposal -- if we build a fully 
available/stand-alone Samza plus the requisite shims to integrate with Slider 
etc I suspect the former may be a lot more work than we think. We may make it 
much easier for a newcomer to get something running but having them step up and 
get a reliable production deployment may still dominate mailing list  traffic, 
if for different reasons than today.

Don't get me wrong -- I'm comfortable with making the Samza dependency on Kafka 
much more explicit and I absolutely see the benefits  in the reduction of 
duplication and clashing terminologies/abstractions that Chris/Jay describe. 
Samza as a library would likely be a very nice tool to add to the Kafka 
ecosystem. I just have the concerns above re the operational side.

Garry

-----Original Message-----
From: Gianmarco De Francisci Morales [mailto:g...@apache.org] 
Sent: 02 July 2015 12:56
To: dev@samza.apache.org
Subject: Re: Thoughts and obesrvations on Samza

Very interesting thoughts.
From outside, I have always perceived Samza as a computing layer over Kafka.

The question, maybe a bit provocative, is "should Samza be a sub-project of 
Kafka then?"
Or does it make sense to keep it as a separate project with a separate 
governance?

Cheers,

--
Gianmarco

On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote:

> Overall, I agree to couple with Kafka more tightly. Because Samza de 
> facto is based on Kafka, and it should leverage what Kafka has. At the 
> same time, Kafka does not need to reinvent what Samza already has. I 
> also like the idea of separating the ingestion and transformation.
>
> But it is a little difficult for me to image how the Samza will look like.
> And I feel Chris and Jay have a little difference in terms of how 
> Samza should look like.
>
> *** Will it look like what Jay's code shows (A client of Kakfa) ? And 
> user's application code calls this client?
>
> 1. If we make Samza be a library of Kafka (like what the code shows), 
> how do we implement auto-balance and fault-tolerance? Are they taken 
> care by the Kafka broker or other mechanism, such as "Samza worker" 
> (just make up the name) ?
>
> 2. What about other features, such as auto-scaling, shared state, 
> monitoring?
>
>
> *** If we have Samza standalone, (is this what Chris suggests?)
>
> 1. we still need to ingest data from Kakfa and produce to it. Then it 
> becomes the same as what Samza looks like now, except it does not rely 
> on Yarn anymore.
>
> 2. if it is standalone, how can it leverage Kafka's metrics, logs, 
> etc? Use Kafka code as the dependency?
>
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Read through the code example and it looks good to me. A few 
> > thoughts regarding deployment:
> >
> > Today Samza deploys as executable runnable like:
> >
> > deploy/samza/bin/run-job.sh --config-factory=... --config-path=file://...
> >
> > And this proposal advocate for deploying Samza more as embedded 
> > libraries in user application code (ignoring the terminology since 
> > it is not the
> same
> > as the prototype code):
> >
> > StreamTask task = new MyStreamTask(configs); Thread thread = new 
> > Thread(task); thread.start();
> >
> > I think both of these deployment modes are important for different 
> > types
> of
> > users. That said, I think making Samza purely standalone is still 
> > sufficient for either runnable or library modes.
> >
> > Guozhang
> >
> > On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Looks like gmail mangled the code example, it was supposed to look 
> > > like
> > > this:
> > >
> > > Properties props = new Properties(); 
> > > props.put("bootstrap.servers", "localhost:4242"); StreamingConfig 
> > > config = new StreamingConfig(props); 
> > > config.subscribe("test-topic-1", "test-topic-2"); 
> > > config.processor(ExampleStreamProcessor.class);
> > > config.serialization(new StringSerializer(), new 
> > > StringDeserializer()); KafkaStreaming container = new 
> > > KafkaStreaming(config); container.run();
> > >
> > > -Jay
> > >
> > > On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Hey guys,
> > > >
> > > > This came out of some conversations Chris and I were having 
> > > > around
> > > whether
> > > > it would make sense to use Samza as a kind of data ingestion
> framework
> > > for
> > > > Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
> > combined
> > > > with complaints around config and YARN and the discussion around 
> > > > how
> to
> > > > best do a standalone mode.
> > > >
> > > > So the thought experiment was, given that Samza was basically 
> > > > already totally Kafka specific, what if you just embraced that 
> > > > and turned it
> > into
> > > > something less like a heavyweight framework and more like a 
> > > > third
> Kafka
> > > > client--a kind of "producing consumer" with state management
> > facilities.
> > > > Basically a library. Instead of a complex stream processing 
> > > > framework
> > > this
> > > > would actually be a very simple thing, not much more complicated 
> > > > to
> use
> > > or
> > > > operate than a Kafka consumer. As Chris said we thought about it 
> > > > a
> lot
> > of
> > > > what Samza (and the other stream processing systems were doing)
> seemed
> > > like
> > > > kind of a hangover from MapReduce.
> > > >
> > > > Of course you need to ingest/output data to and from the stream 
> > > > processing. But when we actually looked into how that would 
> > > > work,
> Samza
> > > > isn't really an ideal data ingestion framework for a bunch of
> reasons.
> > To
> > > > really do that right you need a pretty different internal data 
> > > > model
> > and
> > > > set of apis. So what if you split them and had an api for Kafka 
> > > > ingress/egress (copycat AKA KIP-26) and a separate api for Kafka 
> > > > transformation (Samza).
> > > >
> > > > This would also allow really embracing the same terminology and 
> > > > conventions. One complaint about the current state is that the 
> > > > two
> > > systems
> > > > kind of feel bolted on. Terminology like "stream" vs "topic" and
> > > different
> > > > config and monitoring systems means you kind of have to learn 
> > > > Kafka's
> > > way,
> > > > then learn Samza's slightly different way, then kind of 
> > > > understand
> how
> > > they
> > > > map to each other, which having walked a few people through this 
> > > > is surprisingly tricky for folks to get.
> > > >
> > > > Since I have been spending a lot of time on airplanes I hacked 
> > > > up an ernest but still somewhat incomplete prototype of what 
> > > > this would
> look
> > > > like. This is just unceremoniously dumped into Kafka as it 
> > > > required a
> > few
> > > > changes to the new consumer. Here is the code:
> > > >
> > > >
> > >
> >
> https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org
> /apache/kafka/clients/streaming
> > > >
> > > > For the purpose of the prototype I just liberally renamed 
> > > > everything
> to
> > > > try to align it with Kafka with no regard for compatibility.
> > > >
> > > > To use this would be something like this:
> > > > Properties props = new Properties(); 
> > > > props.put("bootstrap.servers", "localhost:4242"); 
> > > > StreamingConfig config = new
> StreamingConfig(props);
> > > config.subscribe("test-topic-1",
> > > > "test-topic-2"); config.processor(ExampleStreamProcessor.class);
> > > config.serialization(new
> > > > StringSerializer(), new StringDeserializer()); KafkaStreaming
> > container =
> > > > new KafkaStreaming(config); container.run();
> > > >
> > > > KafkaStreaming is basically the SamzaContainer; StreamProcessor 
> > > > is basically StreamTask.
> > > >
> > > > So rather than putting all the class names in a file and then 
> > > > having
> > the
> > > > job assembled by reflection, you just instantiate the container 
> > > > programmatically. Work is balanced over however many instances 
> > > > of
> this
> > > are
> > > > alive at any time (i.e. if an instance dies, new tasks are added 
> > > > to
> the
> > > > existing containers without shutting them down).
> > > >
> > > > We would provide some glue for running this stuff in YARN via 
> > > > Slider, Mesos via Marathon, and AWS using some of their tools 
> > > > but from the
> > point
> > > of
> > > > view of these frameworks these stream processing jobs are just
> > stateless
> > > > services that can come and go and expand and contract at will. 
> > > > There
> is
> > > no
> > > > more custom scheduler.
> > > >
> > > > Here are some relevant details:
> > > >
> > > >    1. It is only ~1300 lines of code, it would get larger if we
> > > >    productionized but not vastly larger. We really do get a ton 
> > > > of
> > > leverage
> > > >    out of Kafka.
> > > >    2. Partition management is fully delegated to the new consumer.
> This
> > > >    is nice since now any partition management strategy available 
> > > > to
> > Kafka
> > > >    consumer is also available to Samza (and vice versa) and with 
> > > > the
> > > exact
> > > >    same configs.
> > > >    3. It supports state as well as state reuse
> > > >
> > > > Anyhow take a look, hopefully it is thought provoking.
> > > >
> > > > -Jay
> > > >
> > > >
> > > >
> > > > On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <
> > criccom...@apache.org>
> > > > wrote:
> > > >
> > > >> Hey all,
> > > >>
> > > >> I have had some discussions with Samza engineers at LinkedIn 
> > > >> and
> > > Confluent
> > > >> and we came up with a few observations and would like to 
> > > >> propose
> some
> > > >> changes.
> > > >>
> > > >> We've observed some things that I want to call out about 
> > > >> Samza's
> > design,
> > > >> and I'd like to propose some changes.
> > > >>
> > > >> * Samza is dependent upon a dynamic deployment system.
> > > >> * Samza is too pluggable.
> > > >> * Samza's SystemConsumer/SystemProducer and Kafka's consumer 
> > > >> APIs
> are
> > > >> trying to solve a lot of the same problems.
> > > >>
> > > >> All three of these issues are related, but I'll address them in
> order.
> > > >>
> > > >> Deployment
> > > >>
> > > >> Samza strongly depends on the use of a dynamic deployment 
> > > >> scheduler
> > such
> > > >> as
> > > >> YARN, Mesos, etc. When we initially built Samza, we bet that 
> > > >> there
> > would
> > > >> be
> > > >> one or two winners in this area, and we could support them, and 
> > > >> the
> > rest
> > > >> would go away. In reality, there are many variations. 
> > > >> Furthermore,
> > many
> > > >> people still prefer to just start their processors like normal 
> > > >> Java processes, and use traditional deployment scripts such as 
> > > >> Fabric,
> > Chef,
> > > >> Ansible, etc. Forcing a deployment system on users makes the 
> > > >> Samza start-up process really painful for first time users.
> > > >>
> > > >> Dynamic deployment as a requirement was also a bit of a 
> > > >> mis-fire
> > because
> > > >> of
> > > >> a fundamental misunderstanding between the nature of batch jobs 
> > > >> and
> > > stream
> > > >> processing jobs. Early on, we made conscious effort to favor 
> > > >> the
> > Hadoop
> > > >> (Map/Reduce) way of doing things, since it worked and was well
> > > understood.
> > > >> One thing that we missed was that batch jobs have a definite
> > beginning,
> > > >> and
> > > >> end, and stream processing jobs don't (usually). This leads to 
> > > >> a
> much
> > > >> simpler scheduling problem for stream processors. You basically 
> > > >> just
> > > need
> > > >> to find a place to start the processor, and start it. The way 
> > > >> we run grids, at LinkedIn, there's no concept of a cluster 
> > > >> being "full". We always
> > add
> > > >> more machines. The problem with coupling Samza with a scheduler 
> > > >> is
> > that
> > > >> Samza (as a framework) now has to handle deployment. This pulls 
> > > >> in a
> > > bunch
> > > >> of things such as configuration distribution (config stream), 
> > > >> shell
> > > scrips
> > > >> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), etc.
> > > >>
> > > >> Another reason for requiring dynamic deployment was to support 
> > > >> data locality. If you want to have locality, you need to put 
> > > >> your
> > processors
> > > >> close to the data they're processing. Upon further 
> > > >> investigation,
> > > though,
> > > >> this feature is not that beneficial. There is some good 
> > > >> discussion
> > about
> > > >> some problems with it on SAMZA-335. Again, we took the 
> > > >> Map/Reduce
> > path,
> > > >> but
> > > >> there are some fundamental differences between HDFS and Kafka. 
> > > >> HDFS
> > has
> > > >> blocks, while Kafka has partitions. This leads to less 
> > > >> optimization potential with stream processors on top of Kafka.
> > > >>
> > > >> This feature is also used as a crutch. Samza doesn't have any 
> > > >> built
> in
> > > >> fault-tolerance logic. Instead, it depends on the dynamic 
> > > >> deployment scheduling system to handle restarts when a 
> > > >> processor dies. This has
> > > made
> > > >> it very difficult to write a standalone Samza container (SAMZA-516).
> > > >>
> > > >> Pluggability
> > > >>
> > > >> In some cases pluggability is good, but I think that we've gone 
> > > >> too
> > far
> > > >> with it. Currently, Samza has:
> > > >>
> > > >> * Pluggable config.
> > > >> * Pluggable metrics.
> > > >> * Pluggable deployment systems.
> > > >> * Pluggable streaming systems (SystemConsumer, SystemProducer, etc).
> > > >> * Pluggable serdes.
> > > >> * Pluggable storage engines.
> > > >> * Pluggable strategies for just about every component
> (MessageChooser,
> > > >> SystemStreamPartitionGrouper, ConfigRewriter, etc).
> > > >>
> > > >> There's probably more that I've forgotten, as well. Some of 
> > > >> these
> are
> > > >> useful, but some have proven not to be. This all comes at a cost:
> > > >> complexity. This complexity is making it harder for our users 
> > > >> to
> pick
> > up
> > > >> and use Samza out of the box. It also makes it difficult for 
> > > >> Samza developers to reason about what the characteristics of 
> > > >> the container (since the characteristics change depending on 
> > > >> which plugins are use).
> > > >>
> > > >> The issues with pluggability are most visible in the System APIs.
> What
> > > >> Samza really requires to be functional is Kafka as its 
> > > >> transport
> > layer.
> > > >> But
> > > >> we've conflated two unrelated use cases into one API:
> > > >>
> > > >> 1. Get data into/out of Kafka.
> > > >> 2. Process the data in Kafka.
> > > >>
> > > >> The current System API supports both of these use cases. The 
> > > >> problem
> > is,
> > > >> we
> > > >> actually want different features for each use case. By papering 
> > > >> over
> > > these
> > > >> two use cases, and providing a single API, we've introduced a 
> > > >> ton of
> > > leaky
> > > >> abstractions.
> > > >>
> > > >> For example, what we'd really like in (2) is to have 
> > > >> monotonically increasing longs for offsets (like Kafka). This 
> > > >> would be at odds
> with
> > > (1),
> > > >> though, since different systems have different
> > > SCNs/Offsets/UUIDs/vectors.
> > > >> There was discussion both on the mailing list and the SQL JIRAs
> about
> > > the
> > > >> need for this.
> > > >>
> > > >> The same thing holds true for replayability. Kafka allows us to
> rewind
> > > >> when
> > > >> we have a failure. Many other systems don't. In some cases, 
> > > >> systems
> > > return
> > > >> null for their offsets (e.g. WikipediaSystemConsumer) because 
> > > >> they
> > have
> > > no
> > > >> offsets.
> > > >>
> > > >> Partitioning is another example. Kafka supports partitioning, 
> > > >> but
> many
> > > >> systems don't. We model this by having a single partition for 
> > > >> those systems. Still, other systems model partitioning differently 
> > > >> (e.g.
> > > >> Kinesis).
> > > >>
> > > >> The SystemAdmin interface is also a mess. Creating streams in a 
> > > >> system-agnostic way is almost impossible. As is modeling 
> > > >> metadata
> for
> > > the
> > > >> system (replication factor, partitions, location, etc). The 
> > > >> list
> goes
> > > on.
> > > >>
> > > >> Duplicate work
> > > >>
> > > >> At the time that we began writing Samza, Kafka's consumer and
> producer
> > > >> APIs
> > > >> had a relatively weak feature set. On the consumer-side, you 
> > > >> had two
> > > >> options: use the high level consumer, or the simple consumer. 
> > > >> The
> > > problem
> > > >> with the high-level consumer was that it controlled your 
> > > >> offsets, partition assignments, and the order in which you 
> > > >> received messages. The
> problem
> > > >> with
> > > >> the simple consumer is that it's not simple. It's basic. You 
> > > >> end up
> > > having
> > > >> to handle a lot of really low-level stuff that you shouldn't. 
> > > >> We
> > spent a
> > > >> lot of time to make Samza's KafkaSystemConsumer very robust. It 
> > > >> also allows us to support some cool features:
> > > >>
> > > >> * Per-partition message ordering and prioritization.
> > > >> * Tight control over partition assignment to support joins, 
> > > >> global
> > state
> > > >> (if we want to implement it :)), etc.
> > > >> * Tight control over offset checkpointing.
> > > >>
> > > >> What we didn't realize at the time is that these features 
> > > >> should
> > > actually
> > > >> be in Kafka. A lot of Kafka consumers (not just Samza stream
> > processors)
> > > >> end up wanting to do things like joins and partition 
> > > >> assignment. The
> > > Kafka
> > > >> community has come to the same conclusion. They're adding a ton 
> > > >> of upgrades into their new Kafka consumer implementation. To a 
> > > >> large extent,
> it's
> > > >> duplicate work to what we've already done in Samza.
> > > >>
> > > >> On top of this, Kafka ended up taking a very similar approach 
> > > >> to
> > Samza's
> > > >> KafkaCheckpointManager implementation for handling offset
> > checkpointing.
> > > >> Like Samza, Kafka's new offset management feature stores offset 
> > > >> checkpoints in a topic, and allows you to fetch them from the 
> > > >> broker.
> > > >>
> > > >> A lot of this seems like a waste, since we could have shared 
> > > >> the
> work
> > if
> > > >> it
> > > >> had been done in Kafka from the get-go.
> > > >>
> > > >> Vision
> > > >>
> > > >> All of this leads me to a rather radical proposal. Samza is
> relatively
> > > >> stable at this point. I'd venture to say that we're near a 1.0
> > release.
> > > >> I'd
> > > >> like to propose that we take what we've learned, and begin 
> > > >> thinking
> > > about
> > > >> Samza beyond 1.0. What would we change if we were starting from
> > scratch?
> > > >> My
> > > >> proposal is to:
> > > >>
> > > >> 1. Make Samza standalone the *only* way to run Samza 
> > > >> processors, and eliminate all direct dependences on YARN, Mesos, etc.
> > > >> 2. Make a definitive call to support only Kafka as the stream
> > processing
> > > >> layer.
> > > >> 3. Eliminate Samza's metrics, logging, serialization, and 
> > > >> config
> > > systems,
> > > >> and simply use Kafka's instead.
> > > >>
> > > >> This would fix all of the issues that I outlined above. It 
> > > >> should
> also
> > > >> shrink the Samza code base pretty dramatically. Supporting only 
> > > >> a standalone container will allow Samza to be executed on YARN 
> > > >> (using Slider), Mesos (using Marathon/Aurora), or most other 
> > > >> in-house
> > > deployment
> > > >> systems. This should make life a lot easier for new users. 
> > > >> Imagine
> > > having
> > > >> the hello-samza tutorial without YARN. The drop in mailing list
> > traffic
> > > >> will be pretty dramatic.
> > > >>
> > > >> Coupling with Kafka seems long overdue to me. The reality is,
> everyone
> > > >> that
> > > >> I'm aware of is using Samza with Kafka. We basically require it
> > already
> > > in
> > > >> order for most features to work. Those that are using other 
> > > >> systems
> > are
> > > >> generally using it for ingest into Kafka (1), and then they do 
> > > >> the processing on top. There is already discussion (
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851
> 767
> > > >> )
> > > >> in Kafka to make ingesting into Kafka extremely easy.
> > > >>
> > > >> Once we make the call to couple with Kafka, we can leverage a 
> > > >> ton of
> > > their
> > > >> ecosystem. We no longer have to maintain our own config, 
> > > >> metrics,
> etc.
> > > We
> > > >> can all share the same libraries, and make them better. This 
> > > >> will
> also
> > > >> allow us to share the consumer/producer APIs, and will let us
> leverage
> > > >> their offset management and partition management, rather than 
> > > >> having
> > our
> > > >> own. All of the coordinator stream code would go away, as would 
> > > >> most
> > of
> > > >> the
> > > >> YARN AppMaster code. We'd probably have to push some partition
> > > management
> > > >> features into the Kafka broker, but they're already moving in 
> > > >> that direction with the new consumer API. The features we have 
> > > >> for
> > partition
> > > >> assignment aren't unique to Samza, and seem like they should be 
> > > >> in
> > Kafka
> > > >> anyway. There will always be some niche usages which will 
> > > >> require
> > extra
> > > >> care and hence full control over partition assignments much 
> > > >> like the
> > > Kafka
> > > >> low level consumer api. These would continue to be supported.
> > > >>
> > > >> These items will be good for the Samza community. They'll make 
> > > >> Samza easier to use, and make it easier for developers to add 
> > > >> new features.
> > > >>
> > > >> Obviously this is a fairly large (and somewhat backwards
> incompatible
> > > >> change). If we choose to go this route, it's important that we
> openly
> > > >> communicate how we're going to provide a migration path from 
> > > >> the
> > > existing
> > > >> APIs to the new ones (if we make incompatible changes). I think 
> > > >> at a minimum, we'd probably need to provide a wrapper to allow 
> > > >> existing StreamTask implementations to continue running on the new 
> > > >> container.
> > > It's
> > > >> also important that we openly communicate about timing, and 
> > > >> stages
> of
> > > the
> > > >> migration.
> > > >>
> > > >> If you made it this far, I'm sure you have opinions. :) Please 
> > > >> send
> > your
> > > >> thoughts and feedback.
> > > >>
> > > >> Cheers,
> > > >> Chris
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to