@Jay, you got my point.

{quote}
I think the question is whether for the "as a
service" you are proposing actually trying to build some layer over
YARN/Mesos/AWS that abstracts these away?
{quote}
I am not very strong on this, but I do see that as an option. The reason I
had is: if all we need from YARN/Mesos/AWS is simply:
1. a set of resource to run Samza containers
2. launch Samza containers on the chosen set of resource
3. make the container processes fault tolerant (i.e. monitor and restart
the failed processes)
There is a good chance that the different execution frameworks can be
abstract out since all cluster management systems would need to provide the
above set of functionalities. I would need to spend more time on the Mesos
patch for Samza to come up w/ more concrete idea on the abstract layer. If
it turns out that the abstraction is not possible, Samza as a service may
have to be implemented in many versions on top of Samza as a process.

On Mon, Jul 6, 2015 at 12:02 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Yi/Martin,
>
> With respect to state reuse: Yeah that is a better explanation of what I
> was trying to say. That prototype actually includes state reuse (each task
> checkpoints it's offset for the various partitions during the commit and
> uses that to determine if the state is valid and can be reused on restart).
> So it is just a matter of a framework being able to express a preference
> for the recently used hosts--those that can will get the optimization and
> those that can't won't.
>
> Yi, for the three layers I think we agree, but I may be unclear on what you
> are actually proposing. I think the question is whether for the "as a
> service" you are proposing actually trying to build some layer over
> YARN/Mesos/AWS that abstracts these away? Of course, there is nothing that
> requires such an abstraction to run stream processing as a service on top
> of (say) YARN. Can you be more specific about what would be in-scope for
> that?
>
> I am generally skeptical of a layer that abstracts away these frameworks
> for two reasons. First I think they are quite different and each is
> advancing fairly rapidly so abstracting over them may be hard. Second I
> think the layer of abstraction doesn't really help the user. If you think
> about it you tend to just adopt one of the frameworks (say Mesos/Marathon)
> and you learn the tooling associated with that and use it for all your
> different systems. Building an abstraction over that doesn't really give
> you any flexibility: you can't swap in another framework because all your
> other stuff is tied to (say) Mesos/Marathon, and the layer of indirection
> obscures the interface the user is already familar with from other systems.
> But I think I may actually be misunderstanding your proposal...
>
> -Jay
>
> On Mon, Jul 6, 2015 at 11:30 AM, Yi Pan <nickpa...@gmail.com> wrote:
>
> > Hi, Martin,
> >
> > Great to hear your voice! I will just try to focus on your questions
> > regarding to "w/o YARN" part.
> >
> > {quote}
> > For example, would host affinity (SAMZA-617) still be possible?
> > {quote}
> > It would be possible if we separate the job execution/process launching
> > from the partition assignment among all Samza containers. Host-affinity
> > could follow the model below:
> > a. We still keeps a container-host mapping as in SAMZA-617
> > b. It would be a best-effort approach try to get the same set of host
> from
> > the job execution framework (e.g. as we did w/ YARN in SamzaAppMaster
> code.
> > If Slider/Marathon comes up w/ this feature, we can migrate to that as
> > well).
> > c. When the container starts, it discovers the local state and try to
> > retain the same partitions via the partition management APIs (i.e.
> > explicitly ask for a specific set of partitions to be assigned to the
> > container, instead of default to whatever broker decides to assign to the
> > container)
> >
> > {quote}
> > a question about the proposed API for instantiating a job in code (rather
> > than a properties file): when submitting a job to a cluster, is the idea
> > that the instantiation code runs on a client somewhere, which then pokes
> > the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code run on
> > each container that is part of the job (in which case, how does the job
> > submission to the cluster work)?
> > {quote}
> > I want to chime in here to propose the three layered model:
> > 1. Samza as a library. In this case, the client application should
> actually
> > implement the interface to submit its own job to a cluster, if the client
> > application chooses to run in a cluster. Samza as a library won't be
> poking
> > any endpoints on YARN/Mesos/AWS
> > 2. Samza as a process. In this case, a properties file could be desired
> > when starting the Samza process. Here again, Samza as a process should
> not
> > need to interact w/ any endpoints on YARN/Mesos/AWS.
> > 3. Samza as a service. In this case, we definitely need some job
> > configuration and a Samza implementation of interface to submit jobs and
> > their configuration to a cluster (i.e. YARN/Mesos/AWS, mostly the same as
> > we have today, except that we should just ask for resources to run Samza
> > containers and leave the partition management aside)
> >
> > Thanks!
> >
> > On Mon, Jul 6, 2015 at 3:37 AM, Martin Kleppmann <mar...@kleppmann.com>
> > wrote:
> >
> > > Hi all,
> > >
> > > Lots of good thoughts here.
> > >
> > > I agree with the general philosophy of tying Samza more firmly to
> Kafka.
> > > After I spent a while looking at integrating other message brokers
> (e.g.
> > > Kinesis) with SystemConsumer, I came to the conclusion that
> > SystemConsumer
> > > tacitly assumes a model so much like Kafka's that pretty much nobody
> but
> > > Kafka actually implements it. (Databus is perhaps an exception, but it
> > > isn't widely used outside of LinkedIn.) Thus, making Samza fully
> > dependent
> > > on Kafka acknowledges that the system-independence was never as real as
> > we
> > > perhaps made it out to be. The gains of code reuse are real.
> > >
> > > The idea of decoupling Samza from YARN has also always been appealing
> to
> > > me, for various reasons already mentioned in this thread. Although
> making
> > > Samza jobs deployable on anything (YARN/Mesos/AWS/etc) seems laudable,
> I
> > am
> > > a little concerned that it will restrict us to a lowest common
> > denominator.
> > > For example, would host affinity (SAMZA-617) still be possible? For
> jobs
> > > with large amounts of state, I think SAMZA-617 would be a big boon,
> since
> > > restoring state off the changelog on every single restart is painful,
> due
> > > to long recovery times. It would be a shame if the decoupling from YARN
> > > made host affinity impossible.
> > >
> > > Jay, a question about the proposed API for instantiating a job in code
> > > (rather than a properties file): when submitting a job to a cluster, is
> > the
> > > idea that the instantiation code runs on a client somewhere, which then
> > > pokes the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code
> > run
> > > on each container that is part of the job (in which case, how does the
> > job
> > > submission to the cluster work)?
> > >
> > > I agree with Garry that it doesn't feel right to make a 1.0 release
> with
> > a
> > > plan for it to be immediately obsolete. So if this is going to happen,
> I
> > > think it would be more honest to stick with 0.* version numbers until
> the
> > > library-ified Samza has been implemented, is stable and widely used.
> > >
> > > Should the new Samza be a subproject of Kafka? There is precedent for
> > > tight coupling between different Apache projects (e.g. Curator and
> > > Zookeeper, or Slider and YARN), so I think remaining separate would be
> > ok.
> > > Even if Samza is fully dependent on Kafka, there is enough substance in
> > > Samza that it warrants being a separate project. An argument in favour
> of
> > > merging would be if we think Kafka has a much stronger "brand presence"
> > > than Samza; I'm ambivalent on that one. If the Kafka project is willing
> > to
> > > endorse Samza as the "official" way of doing stateful stream
> > > transformations, that would probably have much the same effect as
> > > re-branding Samza as "Kafka Stream Processors" or suchlike. Close
> > > collaboration between the two projects will be needed in any case.
> > >
> > > From a project management perspective, I guess the "new Samza" would
> have
> > > to be developed on a branch alongside ongoing maintenance of the
> current
> > > line of development? I think it would be important to continue
> supporting
> > > existing users, and provide a graceful migration path to the new
> version.
> > > Leaving the current versions unsupported and forcing people to rewrite
> > > their jobs would send a bad signal.
> > >
> > > Best,
> > > Martin
> > >
> > > On 2 Jul 2015, at 16:59, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Hey Garry,
> > > >
> > > > Yeah that's super frustrating. I'd be happy to chat more about this
> if
> > > > you'd be interested. I think Chris and I started with the idea of
> "what
> > > > would it take to make Samza a kick-ass ingestion tool" but ultimately
> > we
> > > > kind of came around to the idea that ingestion and transformation had
> > > > pretty different needs and coupling the two made things hard.
> > > >
> > > > For what it's worth I think copycat (KIP-26) actually will do what
> you
> > > are
> > > > looking for.
> > > >
> > > > With regard to your point about slider, I don't necessarily disagree.
> > > But I
> > > > think getting good YARN support is quite doable and I think we can
> make
> > > > that work well. I think the issue this proposal solves is that
> > > technically
> > > > it is pretty hard to support multiple cluster management systems the
> > way
> > > > things are now, you need to write an "app master" or "framework" for
> > each
> > > > and they are all a little different so testing is really hard. In the
> > > > absence of this we have been stuck with just YARN which has fantastic
> > > > penetration in the Hadoopy part of the org, but zero penetration
> > > elsewhere.
> > > > Given the huge amount of work being put in to slider, marathon, aws
> > > > tooling, not to mention the umpteen related packaging technologies
> > people
> > > > want to use (Docker, Kubernetes, various cloud-specific deploy tools,
> > > etc)
> > > > I really think it is important to get this right.
> > > >
> > > > -Jay
> > > >
> > > > On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington <
> > > > g.turking...@improvedigital.com> wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I think the question below re does Samza become a sub-project of
> Kafka
> > > >> highlights the broader point around migration. Chris mentions
> Samza's
> > > >> maturity is heading towards a v1 release but I'm not sure it feels
> > > right to
> > > >> launch a v1 then immediately plan to deprecate most of it.
> > > >>
> > > >> From a selfish perspective I have some guys who have started working
> > > with
> > > >> Samza and building some new consumers/producers was next up. Sounds
> > like
> > > >> that is absolutely not the direction to go. I need to look into the
> > KIP
> > > in
> > > >> more detail but for me the attractiveness of adding new Samza
> > > >> consumer/producers -- even if yes all they were doing was really
> > getting
> > > >> data into and out of Kafka --  was to avoid  having to worry about
> the
> > > >> lifecycle management of external clients. If there is a generic
> Kafka
> > > >> ingress/egress layer that I can plug a new connector into and have a
> > > lot of
> > > >> the heavy lifting re scale and reliability done for me then it gives
> > me
> > > all
> > > >> the pushing new consumers/producers would. If not then it
> complicates
> > my
> > > >> operational deployments.
> > > >>
> > > >> Which is similar to my other question with the proposal -- if we
> > build a
> > > >> fully available/stand-alone Samza plus the requisite shims to
> > integrate
> > > >> with Slider etc I suspect the former may be a lot more work than we
> > > think.
> > > >> We may make it much easier for a newcomer to get something running
> but
> > > >> having them step up and get a reliable production deployment may
> still
> > > >> dominate mailing list  traffic, if for different reasons than today.
> > > >>
> > > >> Don't get me wrong -- I'm comfortable with making the Samza
> dependency
> > > on
> > > >> Kafka much more explicit and I absolutely see the benefits  in the
> > > >> reduction of duplication and clashing terminologies/abstractions
> that
> > > >> Chris/Jay describe. Samza as a library would likely be a very nice
> > tool
> > > to
> > > >> add to the Kafka ecosystem. I just have the concerns above re the
> > > >> operational side.
> > > >>
> > > >> Garry
> > > >>
> > > >> -----Original Message-----
> > > >> From: Gianmarco De Francisci Morales [mailto:g...@apache.org]
> > > >> Sent: 02 July 2015 12:56
> > > >> To: dev@samza.apache.org
> > > >> Subject: Re: Thoughts and obesrvations on Samza
> > > >>
> > > >> Very interesting thoughts.
> > > >> From outside, I have always perceived Samza as a computing layer
> over
> > > >> Kafka.
> > > >>
> > > >> The question, maybe a bit provocative, is "should Samza be a
> > sub-project
> > > >> of Kafka then?"
> > > >> Or does it make sense to keep it as a separate project with a
> separate
> > > >> governance?
> > > >>
> > > >> Cheers,
> > > >>
> > > >> --
> > > >> Gianmarco
> > > >>
> > > >> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote:
> > > >>
> > > >>> Overall, I agree to couple with Kafka more tightly. Because Samza
> de
> > > >>> facto is based on Kafka, and it should leverage what Kafka has. At
> > the
> > > >>> same time, Kafka does not need to reinvent what Samza already has.
> I
> > > >>> also like the idea of separating the ingestion and transformation.
> > > >>>
> > > >>> But it is a little difficult for me to image how the Samza will
> look
> > > >> like.
> > > >>> And I feel Chris and Jay have a little difference in terms of how
> > > >>> Samza should look like.
> > > >>>
> > > >>> *** Will it look like what Jay's code shows (A client of Kakfa) ?
> And
> > > >>> user's application code calls this client?
> > > >>>
> > > >>> 1. If we make Samza be a library of Kafka (like what the code
> shows),
> > > >>> how do we implement auto-balance and fault-tolerance? Are they
> taken
> > > >>> care by the Kafka broker or other mechanism, such as "Samza worker"
> > > >>> (just make up the name) ?
> > > >>>
> > > >>> 2. What about other features, such as auto-scaling, shared state,
> > > >>> monitoring?
> > > >>>
> > > >>>
> > > >>> *** If we have Samza standalone, (is this what Chris suggests?)
> > > >>>
> > > >>> 1. we still need to ingest data from Kakfa and produce to it. Then
> it
> > > >>> becomes the same as what Samza looks like now, except it does not
> > rely
> > > >>> on Yarn anymore.
> > > >>>
> > > >>> 2. if it is standalone, how can it leverage Kafka's metrics, logs,
> > > >>> etc? Use Kafka code as the dependency?
> > > >>>
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Fang, Yan
> > > >>> yanfang...@gmail.com
> > > >>>
> > > >>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Read through the code example and it looks good to me. A few
> > > >>>> thoughts regarding deployment:
> > > >>>>
> > > >>>> Today Samza deploys as executable runnable like:
> > > >>>>
> > > >>>> deploy/samza/bin/run-job.sh --config-factory=...
> > > >> --config-path=file://...
> > > >>>>
> > > >>>> And this proposal advocate for deploying Samza more as embedded
> > > >>>> libraries in user application code (ignoring the terminology since
> > > >>>> it is not the
> > > >>> same
> > > >>>> as the prototype code):
> > > >>>>
> > > >>>> StreamTask task = new MyStreamTask(configs); Thread thread = new
> > > >>>> Thread(task); thread.start();
> > > >>>>
> > > >>>> I think both of these deployment modes are important for different
> > > >>>> types
> > > >>> of
> > > >>>> users. That said, I think making Samza purely standalone is still
> > > >>>> sufficient for either runnable or library modes.
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io>
> > wrote:
> > > >>>>
> > > >>>>> Looks like gmail mangled the code example, it was supposed to
> look
> > > >>>>> like
> > > >>>>> this:
> > > >>>>>
> > > >>>>> Properties props = new Properties();
> > > >>>>> props.put("bootstrap.servers", "localhost:4242"); StreamingConfig
> > > >>>>> config = new StreamingConfig(props);
> > > >>>>> config.subscribe("test-topic-1", "test-topic-2");
> > > >>>>> config.processor(ExampleStreamProcessor.class);
> > > >>>>> config.serialization(new StringSerializer(), new
> > > >>>>> StringDeserializer()); KafkaStreaming container = new
> > > >>>>> KafkaStreaming(config); container.run();
> > > >>>>>
> > > >>>>> -Jay
> > > >>>>>
> > > >>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io>
> > > >> wrote:
> > > >>>>>
> > > >>>>>> Hey guys,
> > > >>>>>>
> > > >>>>>> This came out of some conversations Chris and I were having
> > > >>>>>> around
> > > >>>>> whether
> > > >>>>>> it would make sense to use Samza as a kind of data ingestion
> > > >>> framework
> > > >>>>> for
> > > >>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
> > > >>>> combined
> > > >>>>>> with complaints around config and YARN and the discussion around
> > > >>>>>> how
> > > >>> to
> > > >>>>>> best do a standalone mode.
> > > >>>>>>
> > > >>>>>> So the thought experiment was, given that Samza was basically
> > > >>>>>> already totally Kafka specific, what if you just embraced that
> > > >>>>>> and turned it
> > > >>>> into
> > > >>>>>> something less like a heavyweight framework and more like a
> > > >>>>>> third
> > > >>> Kafka
> > > >>>>>> client--a kind of "producing consumer" with state management
> > > >>>> facilities.
> > > >>>>>> Basically a library. Instead of a complex stream processing
> > > >>>>>> framework
> > > >>>>> this
> > > >>>>>> would actually be a very simple thing, not much more complicated
> > > >>>>>> to
> > > >>> use
> > > >>>>> or
> > > >>>>>> operate than a Kafka consumer. As Chris said we thought about it
> > > >>>>>> a
> > > >>> lot
> > > >>>> of
> > > >>>>>> what Samza (and the other stream processing systems were doing)
> > > >>> seemed
> > > >>>>> like
> > > >>>>>> kind of a hangover from MapReduce.
> > > >>>>>>
> > > >>>>>> Of course you need to ingest/output data to and from the stream
> > > >>>>>> processing. But when we actually looked into how that would
> > > >>>>>> work,
> > > >>> Samza
> > > >>>>>> isn't really an ideal data ingestion framework for a bunch of
> > > >>> reasons.
> > > >>>> To
> > > >>>>>> really do that right you need a pretty different internal data
> > > >>>>>> model
> > > >>>> and
> > > >>>>>> set of apis. So what if you split them and had an api for Kafka
> > > >>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for Kafka
> > > >>>>>> transformation (Samza).
> > > >>>>>>
> > > >>>>>> This would also allow really embracing the same terminology and
> > > >>>>>> conventions. One complaint about the current state is that the
> > > >>>>>> two
> > > >>>>> systems
> > > >>>>>> kind of feel bolted on. Terminology like "stream" vs "topic" and
> > > >>>>> different
> > > >>>>>> config and monitoring systems means you kind of have to learn
> > > >>>>>> Kafka's
> > > >>>>> way,
> > > >>>>>> then learn Samza's slightly different way, then kind of
> > > >>>>>> understand
> > > >>> how
> > > >>>>> they
> > > >>>>>> map to each other, which having walked a few people through this
> > > >>>>>> is surprisingly tricky for folks to get.
> > > >>>>>>
> > > >>>>>> Since I have been spending a lot of time on airplanes I hacked
> > > >>>>>> up an ernest but still somewhat incomplete prototype of what
> > > >>>>>> this would
> > > >>> look
> > > >>>>>> like. This is just unceremoniously dumped into Kafka as it
> > > >>>>>> required a
> > > >>>> few
> > > >>>>>> changes to the new consumer. Here is the code:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org
> > > >>> /apache/kafka/clients/streaming
> > > >>>>>>
> > > >>>>>> For the purpose of the prototype I just liberally renamed
> > > >>>>>> everything
> > > >>> to
> > > >>>>>> try to align it with Kafka with no regard for compatibility.
> > > >>>>>>
> > > >>>>>> To use this would be something like this:
> > > >>>>>> Properties props = new Properties();
> > > >>>>>> props.put("bootstrap.servers", "localhost:4242");
> > > >>>>>> StreamingConfig config = new
> > > >>> StreamingConfig(props);
> > > >>>>> config.subscribe("test-topic-1",
> > > >>>>>> "test-topic-2"); config.processor(ExampleStreamProcessor.class);
> > > >>>>> config.serialization(new
> > > >>>>>> StringSerializer(), new StringDeserializer()); KafkaStreaming
> > > >>>> container =
> > > >>>>>> new KafkaStreaming(config); container.run();
> > > >>>>>>
> > > >>>>>> KafkaStreaming is basically the SamzaContainer; StreamProcessor
> > > >>>>>> is basically StreamTask.
> > > >>>>>>
> > > >>>>>> So rather than putting all the class names in a file and then
> > > >>>>>> having
> > > >>>> the
> > > >>>>>> job assembled by reflection, you just instantiate the container
> > > >>>>>> programmatically. Work is balanced over however many instances
> > > >>>>>> of
> > > >>> this
> > > >>>>> are
> > > >>>>>> alive at any time (i.e. if an instance dies, new tasks are added
> > > >>>>>> to
> > > >>> the
> > > >>>>>> existing containers without shutting them down).
> > > >>>>>>
> > > >>>>>> We would provide some glue for running this stuff in YARN via
> > > >>>>>> Slider, Mesos via Marathon, and AWS using some of their tools
> > > >>>>>> but from the
> > > >>>> point
> > > >>>>> of
> > > >>>>>> view of these frameworks these stream processing jobs are just
> > > >>>> stateless
> > > >>>>>> services that can come and go and expand and contract at will.
> > > >>>>>> There
> > > >>> is
> > > >>>>> no
> > > >>>>>> more custom scheduler.
> > > >>>>>>
> > > >>>>>> Here are some relevant details:
> > > >>>>>>
> > > >>>>>>   1. It is only ~1300 lines of code, it would get larger if we
> > > >>>>>>   productionized but not vastly larger. We really do get a ton
> > > >>>>>> of
> > > >>>>> leverage
> > > >>>>>>   out of Kafka.
> > > >>>>>>   2. Partition management is fully delegated to the new
> consumer.
> > > >>> This
> > > >>>>>>   is nice since now any partition management strategy available
> > > >>>>>> to
> > > >>>> Kafka
> > > >>>>>>   consumer is also available to Samza (and vice versa) and with
> > > >>>>>> the
> > > >>>>> exact
> > > >>>>>>   same configs.
> > > >>>>>>   3. It supports state as well as state reuse
> > > >>>>>>
> > > >>>>>> Anyhow take a look, hopefully it is thought provoking.
> > > >>>>>>
> > > >>>>>> -Jay
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <
> > > >>>> criccom...@apache.org>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hey all,
> > > >>>>>>>
> > > >>>>>>> I have had some discussions with Samza engineers at LinkedIn
> > > >>>>>>> and
> > > >>>>> Confluent
> > > >>>>>>> and we came up with a few observations and would like to
> > > >>>>>>> propose
> > > >>> some
> > > >>>>>>> changes.
> > > >>>>>>>
> > > >>>>>>> We've observed some things that I want to call out about
> > > >>>>>>> Samza's
> > > >>>> design,
> > > >>>>>>> and I'd like to propose some changes.
> > > >>>>>>>
> > > >>>>>>> * Samza is dependent upon a dynamic deployment system.
> > > >>>>>>> * Samza is too pluggable.
> > > >>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer
> > > >>>>>>> APIs
> > > >>> are
> > > >>>>>>> trying to solve a lot of the same problems.
> > > >>>>>>>
> > > >>>>>>> All three of these issues are related, but I'll address them in
> > > >>> order.
> > > >>>>>>>
> > > >>>>>>> Deployment
> > > >>>>>>>
> > > >>>>>>> Samza strongly depends on the use of a dynamic deployment
> > > >>>>>>> scheduler
> > > >>>> such
> > > >>>>>>> as
> > > >>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet that
> > > >>>>>>> there
> > > >>>> would
> > > >>>>>>> be
> > > >>>>>>> one or two winners in this area, and we could support them, and
> > > >>>>>>> the
> > > >>>> rest
> > > >>>>>>> would go away. In reality, there are many variations.
> > > >>>>>>> Furthermore,
> > > >>>> many
> > > >>>>>>> people still prefer to just start their processors like normal
> > > >>>>>>> Java processes, and use traditional deployment scripts such as
> > > >>>>>>> Fabric,
> > > >>>> Chef,
> > > >>>>>>> Ansible, etc. Forcing a deployment system on users makes the
> > > >>>>>>> Samza start-up process really painful for first time users.
> > > >>>>>>>
> > > >>>>>>> Dynamic deployment as a requirement was also a bit of a
> > > >>>>>>> mis-fire
> > > >>>> because
> > > >>>>>>> of
> > > >>>>>>> a fundamental misunderstanding between the nature of batch jobs
> > > >>>>>>> and
> > > >>>>> stream
> > > >>>>>>> processing jobs. Early on, we made conscious effort to favor
> > > >>>>>>> the
> > > >>>> Hadoop
> > > >>>>>>> (Map/Reduce) way of doing things, since it worked and was well
> > > >>>>> understood.
> > > >>>>>>> One thing that we missed was that batch jobs have a definite
> > > >>>> beginning,
> > > >>>>>>> and
> > > >>>>>>> end, and stream processing jobs don't (usually). This leads to
> > > >>>>>>> a
> > > >>> much
> > > >>>>>>> simpler scheduling problem for stream processors. You basically
> > > >>>>>>> just
> > > >>>>> need
> > > >>>>>>> to find a place to start the processor, and start it. The way
> > > >>>>>>> we run grids, at LinkedIn, there's no concept of a cluster
> > > >>>>>>> being "full". We always
> > > >>>> add
> > > >>>>>>> more machines. The problem with coupling Samza with a scheduler
> > > >>>>>>> is
> > > >>>> that
> > > >>>>>>> Samza (as a framework) now has to handle deployment. This pulls
> > > >>>>>>> in a
> > > >>>>> bunch
> > > >>>>>>> of things such as configuration distribution (config stream),
> > > >>>>>>> shell
> > > >>>>> scrips
> > > >>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff),
> etc.
> > > >>>>>>>
> > > >>>>>>> Another reason for requiring dynamic deployment was to support
> > > >>>>>>> data locality. If you want to have locality, you need to put
> > > >>>>>>> your
> > > >>>> processors
> > > >>>>>>> close to the data they're processing. Upon further
> > > >>>>>>> investigation,
> > > >>>>> though,
> > > >>>>>>> this feature is not that beneficial. There is some good
> > > >>>>>>> discussion
> > > >>>> about
> > > >>>>>>> some problems with it on SAMZA-335. Again, we took the
> > > >>>>>>> Map/Reduce
> > > >>>> path,
> > > >>>>>>> but
> > > >>>>>>> there are some fundamental differences between HDFS and Kafka.
> > > >>>>>>> HDFS
> > > >>>> has
> > > >>>>>>> blocks, while Kafka has partitions. This leads to less
> > > >>>>>>> optimization potential with stream processors on top of Kafka.
> > > >>>>>>>
> > > >>>>>>> This feature is also used as a crutch. Samza doesn't have any
> > > >>>>>>> built
> > > >>> in
> > > >>>>>>> fault-tolerance logic. Instead, it depends on the dynamic
> > > >>>>>>> deployment scheduling system to handle restarts when a
> > > >>>>>>> processor dies. This has
> > > >>>>> made
> > > >>>>>>> it very difficult to write a standalone Samza container
> > > >> (SAMZA-516).
> > > >>>>>>>
> > > >>>>>>> Pluggability
> > > >>>>>>>
> > > >>>>>>> In some cases pluggability is good, but I think that we've gone
> > > >>>>>>> too
> > > >>>> far
> > > >>>>>>> with it. Currently, Samza has:
> > > >>>>>>>
> > > >>>>>>> * Pluggable config.
> > > >>>>>>> * Pluggable metrics.
> > > >>>>>>> * Pluggable deployment systems.
> > > >>>>>>> * Pluggable streaming systems (SystemConsumer, SystemProducer,
> > > >> etc).
> > > >>>>>>> * Pluggable serdes.
> > > >>>>>>> * Pluggable storage engines.
> > > >>>>>>> * Pluggable strategies for just about every component
> > > >>> (MessageChooser,
> > > >>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc).
> > > >>>>>>>
> > > >>>>>>> There's probably more that I've forgotten, as well. Some of
> > > >>>>>>> these
> > > >>> are
> > > >>>>>>> useful, but some have proven not to be. This all comes at a
> cost:
> > > >>>>>>> complexity. This complexity is making it harder for our users
> > > >>>>>>> to
> > > >>> pick
> > > >>>> up
> > > >>>>>>> and use Samza out of the box. It also makes it difficult for
> > > >>>>>>> Samza developers to reason about what the characteristics of
> > > >>>>>>> the container (since the characteristics change depending on
> > > >>>>>>> which plugins are use).
> > > >>>>>>>
> > > >>>>>>> The issues with pluggability are most visible in the System
> APIs.
> > > >>> What
> > > >>>>>>> Samza really requires to be functional is Kafka as its
> > > >>>>>>> transport
> > > >>>> layer.
> > > >>>>>>> But
> > > >>>>>>> we've conflated two unrelated use cases into one API:
> > > >>>>>>>
> > > >>>>>>> 1. Get data into/out of Kafka.
> > > >>>>>>> 2. Process the data in Kafka.
> > > >>>>>>>
> > > >>>>>>> The current System API supports both of these use cases. The
> > > >>>>>>> problem
> > > >>>> is,
> > > >>>>>>> we
> > > >>>>>>> actually want different features for each use case. By papering
> > > >>>>>>> over
> > > >>>>> these
> > > >>>>>>> two use cases, and providing a single API, we've introduced a
> > > >>>>>>> ton of
> > > >>>>> leaky
> > > >>>>>>> abstractions.
> > > >>>>>>>
> > > >>>>>>> For example, what we'd really like in (2) is to have
> > > >>>>>>> monotonically increasing longs for offsets (like Kafka). This
> > > >>>>>>> would be at odds
> > > >>> with
> > > >>>>> (1),
> > > >>>>>>> though, since different systems have different
> > > >>>>> SCNs/Offsets/UUIDs/vectors.
> > > >>>>>>> There was discussion both on the mailing list and the SQL JIRAs
> > > >>> about
> > > >>>>> the
> > > >>>>>>> need for this.
> > > >>>>>>>
> > > >>>>>>> The same thing holds true for replayability. Kafka allows us to
> > > >>> rewind
> > > >>>>>>> when
> > > >>>>>>> we have a failure. Many other systems don't. In some cases,
> > > >>>>>>> systems
> > > >>>>> return
> > > >>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) because
> > > >>>>>>> they
> > > >>>> have
> > > >>>>> no
> > > >>>>>>> offsets.
> > > >>>>>>>
> > > >>>>>>> Partitioning is another example. Kafka supports partitioning,
> > > >>>>>>> but
> > > >>> many
> > > >>>>>>> systems don't. We model this by having a single partition for
> > > >>>>>>> those systems. Still, other systems model partitioning
> > > >> differently (e.g.
> > > >>>>>>> Kinesis).
> > > >>>>>>>
> > > >>>>>>> The SystemAdmin interface is also a mess. Creating streams in a
> > > >>>>>>> system-agnostic way is almost impossible. As is modeling
> > > >>>>>>> metadata
> > > >>> for
> > > >>>>> the
> > > >>>>>>> system (replication factor, partitions, location, etc). The
> > > >>>>>>> list
> > > >>> goes
> > > >>>>> on.
> > > >>>>>>>
> > > >>>>>>> Duplicate work
> > > >>>>>>>
> > > >>>>>>> At the time that we began writing Samza, Kafka's consumer and
> > > >>> producer
> > > >>>>>>> APIs
> > > >>>>>>> had a relatively weak feature set. On the consumer-side, you
> > > >>>>>>> had two
> > > >>>>>>> options: use the high level consumer, or the simple consumer.
> > > >>>>>>> The
> > > >>>>> problem
> > > >>>>>>> with the high-level consumer was that it controlled your
> > > >>>>>>> offsets, partition assignments, and the order in which you
> > > >>>>>>> received messages. The
> > > >>> problem
> > > >>>>>>> with
> > > >>>>>>> the simple consumer is that it's not simple. It's basic. You
> > > >>>>>>> end up
> > > >>>>> having
> > > >>>>>>> to handle a lot of really low-level stuff that you shouldn't.
> > > >>>>>>> We
> > > >>>> spent a
> > > >>>>>>> lot of time to make Samza's KafkaSystemConsumer very robust. It
> > > >>>>>>> also allows us to support some cool features:
> > > >>>>>>>
> > > >>>>>>> * Per-partition message ordering and prioritization.
> > > >>>>>>> * Tight control over partition assignment to support joins,
> > > >>>>>>> global
> > > >>>> state
> > > >>>>>>> (if we want to implement it :)), etc.
> > > >>>>>>> * Tight control over offset checkpointing.
> > > >>>>>>>
> > > >>>>>>> What we didn't realize at the time is that these features
> > > >>>>>>> should
> > > >>>>> actually
> > > >>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza stream
> > > >>>> processors)
> > > >>>>>>> end up wanting to do things like joins and partition
> > > >>>>>>> assignment. The
> > > >>>>> Kafka
> > > >>>>>>> community has come to the same conclusion. They're adding a ton
> > > >>>>>>> of upgrades into their new Kafka consumer implementation. To a
> > > >>>>>>> large extent,
> > > >>> it's
> > > >>>>>>> duplicate work to what we've already done in Samza.
> > > >>>>>>>
> > > >>>>>>> On top of this, Kafka ended up taking a very similar approach
> > > >>>>>>> to
> > > >>>> Samza's
> > > >>>>>>> KafkaCheckpointManager implementation for handling offset
> > > >>>> checkpointing.
> > > >>>>>>> Like Samza, Kafka's new offset management feature stores offset
> > > >>>>>>> checkpoints in a topic, and allows you to fetch them from the
> > > >>>>>>> broker.
> > > >>>>>>>
> > > >>>>>>> A lot of this seems like a waste, since we could have shared
> > > >>>>>>> the
> > > >>> work
> > > >>>> if
> > > >>>>>>> it
> > > >>>>>>> had been done in Kafka from the get-go.
> > > >>>>>>>
> > > >>>>>>> Vision
> > > >>>>>>>
> > > >>>>>>> All of this leads me to a rather radical proposal. Samza is
> > > >>> relatively
> > > >>>>>>> stable at this point. I'd venture to say that we're near a 1.0
> > > >>>> release.
> > > >>>>>>> I'd
> > > >>>>>>> like to propose that we take what we've learned, and begin
> > > >>>>>>> thinking
> > > >>>>> about
> > > >>>>>>> Samza beyond 1.0. What would we change if we were starting from
> > > >>>> scratch?
> > > >>>>>>> My
> > > >>>>>>> proposal is to:
> > > >>>>>>>
> > > >>>>>>> 1. Make Samza standalone the *only* way to run Samza
> > > >>>>>>> processors, and eliminate all direct dependences on YARN,
> Mesos,
> > > >> etc.
> > > >>>>>>> 2. Make a definitive call to support only Kafka as the stream
> > > >>>> processing
> > > >>>>>>> layer.
> > > >>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and
> > > >>>>>>> config
> > > >>>>> systems,
> > > >>>>>>> and simply use Kafka's instead.
> > > >>>>>>>
> > > >>>>>>> This would fix all of the issues that I outlined above. It
> > > >>>>>>> should
> > > >>> also
> > > >>>>>>> shrink the Samza code base pretty dramatically. Supporting only
> > > >>>>>>> a standalone container will allow Samza to be executed on YARN
> > > >>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most other
> > > >>>>>>> in-house
> > > >>>>> deployment
> > > >>>>>>> systems. This should make life a lot easier for new users.
> > > >>>>>>> Imagine
> > > >>>>> having
> > > >>>>>>> the hello-samza tutorial without YARN. The drop in mailing list
> > > >>>> traffic
> > > >>>>>>> will be pretty dramatic.
> > > >>>>>>>
> > > >>>>>>> Coupling with Kafka seems long overdue to me. The reality is,
> > > >>> everyone
> > > >>>>>>> that
> > > >>>>>>> I'm aware of is using Samza with Kafka. We basically require it
> > > >>>> already
> > > >>>>> in
> > > >>>>>>> order for most features to work. Those that are using other
> > > >>>>>>> systems
> > > >>>> are
> > > >>>>>>> generally using it for ingest into Kafka (1), and then they do
> > > >>>>>>> the processing on top. There is already discussion (
> > > >>>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851
> > > >>> 767
> > > >>>>>>> )
> > > >>>>>>> in Kafka to make ingesting into Kafka extremely easy.
> > > >>>>>>>
> > > >>>>>>> Once we make the call to couple with Kafka, we can leverage a
> > > >>>>>>> ton of
> > > >>>>> their
> > > >>>>>>> ecosystem. We no longer have to maintain our own config,
> > > >>>>>>> metrics,
> > > >>> etc.
> > > >>>>> We
> > > >>>>>>> can all share the same libraries, and make them better. This
> > > >>>>>>> will
> > > >>> also
> > > >>>>>>> allow us to share the consumer/producer APIs, and will let us
> > > >>> leverage
> > > >>>>>>> their offset management and partition management, rather than
> > > >>>>>>> having
> > > >>>> our
> > > >>>>>>> own. All of the coordinator stream code would go away, as would
> > > >>>>>>> most
> > > >>>> of
> > > >>>>>>> the
> > > >>>>>>> YARN AppMaster code. We'd probably have to push some partition
> > > >>>>> management
> > > >>>>>>> features into the Kafka broker, but they're already moving in
> > > >>>>>>> that direction with the new consumer API. The features we have
> > > >>>>>>> for
> > > >>>> partition
> > > >>>>>>> assignment aren't unique to Samza, and seem like they should be
> > > >>>>>>> in
> > > >>>> Kafka
> > > >>>>>>> anyway. There will always be some niche usages which will
> > > >>>>>>> require
> > > >>>> extra
> > > >>>>>>> care and hence full control over partition assignments much
> > > >>>>>>> like the
> > > >>>>> Kafka
> > > >>>>>>> low level consumer api. These would continue to be supported.
> > > >>>>>>>
> > > >>>>>>> These items will be good for the Samza community. They'll make
> > > >>>>>>> Samza easier to use, and make it easier for developers to add
> > > >>>>>>> new features.
> > > >>>>>>>
> > > >>>>>>> Obviously this is a fairly large (and somewhat backwards
> > > >>> incompatible
> > > >>>>>>> change). If we choose to go this route, it's important that we
> > > >>> openly
> > > >>>>>>> communicate how we're going to provide a migration path from
> > > >>>>>>> the
> > > >>>>> existing
> > > >>>>>>> APIs to the new ones (if we make incompatible changes). I think
> > > >>>>>>> at a minimum, we'd probably need to provide a wrapper to allow
> > > >>>>>>> existing StreamTask implementations to continue running on the
> > > >> new container.
> > > >>>>> It's
> > > >>>>>>> also important that we openly communicate about timing, and
> > > >>>>>>> stages
> > > >>> of
> > > >>>>> the
> > > >>>>>>> migration.
> > > >>>>>>>
> > > >>>>>>> If you made it this far, I'm sure you have opinions. :) Please
> > > >>>>>>> send
> > > >>>> your
> > > >>>>>>> thoughts and feedback.
> > > >>>>>>>
> > > >>>>>>> Cheers,
> > > >>>>>>> Chris
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -- Guozhang
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to