{quote}
There is a good chance that the different execution frameworks can be
abstract out
{quote}
Actually, I think that even we define an abstract layer, it won't need to
have Samza process be aware of the existence of the abstract layer. The
Samza container running as a process can be totally unaware of the
execution framework it uses. The job submission/configuration/launching
tools can be completely isolated from the Samza container as a process,
ideally.

On Mon, Jul 6, 2015 at 3:13 PM, Yi Pan <nickpa...@gmail.com> wrote:

> @Jay, you got my point.
>
> {quote}
> I think the question is whether for the "as a
> service" you are proposing actually trying to build some layer over
> YARN/Mesos/AWS that abstracts these away?
> {quote}
> I am not very strong on this, but I do see that as an option. The reason I
> had is: if all we need from YARN/Mesos/AWS is simply:
> 1. a set of resource to run Samza containers
> 2. launch Samza containers on the chosen set of resource
> 3. make the container processes fault tolerant (i.e. monitor and restart
> the failed processes)
> There is a good chance that the different execution frameworks can be
> abstract out since all cluster management systems would need to provide the
> above set of functionalities. I would need to spend more time on the Mesos
> patch for Samza to come up w/ more concrete idea on the abstract layer. If
> it turns out that the abstraction is not possible, Samza as a service may
> have to be implemented in many versions on top of Samza as a process.
>
> On Mon, Jul 6, 2015 at 12:02 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
>> Hey Yi/Martin,
>>
>> With respect to state reuse: Yeah that is a better explanation of what I
>> was trying to say. That prototype actually includes state reuse (each task
>> checkpoints it's offset for the various partitions during the commit and
>> uses that to determine if the state is valid and can be reused on
>> restart).
>> So it is just a matter of a framework being able to express a preference
>> for the recently used hosts--those that can will get the optimization and
>> those that can't won't.
>>
>> Yi, for the three layers I think we agree, but I may be unclear on what
>> you
>> are actually proposing. I think the question is whether for the "as a
>> service" you are proposing actually trying to build some layer over
>> YARN/Mesos/AWS that abstracts these away? Of course, there is nothing that
>> requires such an abstraction to run stream processing as a service on top
>> of (say) YARN. Can you be more specific about what would be in-scope for
>> that?
>>
>> I am generally skeptical of a layer that abstracts away these frameworks
>> for two reasons. First I think they are quite different and each is
>> advancing fairly rapidly so abstracting over them may be hard. Second I
>> think the layer of abstraction doesn't really help the user. If you think
>> about it you tend to just adopt one of the frameworks (say Mesos/Marathon)
>> and you learn the tooling associated with that and use it for all your
>> different systems. Building an abstraction over that doesn't really give
>> you any flexibility: you can't swap in another framework because all your
>> other stuff is tied to (say) Mesos/Marathon, and the layer of indirection
>> obscures the interface the user is already familar with from other
>> systems.
>> But I think I may actually be misunderstanding your proposal...
>>
>> -Jay
>>
>> On Mon, Jul 6, 2015 at 11:30 AM, Yi Pan <nickpa...@gmail.com> wrote:
>>
>> > Hi, Martin,
>> >
>> > Great to hear your voice! I will just try to focus on your questions
>> > regarding to "w/o YARN" part.
>> >
>> > {quote}
>> > For example, would host affinity (SAMZA-617) still be possible?
>> > {quote}
>> > It would be possible if we separate the job execution/process launching
>> > from the partition assignment among all Samza containers. Host-affinity
>> > could follow the model below:
>> > a. We still keeps a container-host mapping as in SAMZA-617
>> > b. It would be a best-effort approach try to get the same set of host
>> from
>> > the job execution framework (e.g. as we did w/ YARN in SamzaAppMaster
>> code.
>> > If Slider/Marathon comes up w/ this feature, we can migrate to that as
>> > well).
>> > c. When the container starts, it discovers the local state and try to
>> > retain the same partitions via the partition management APIs (i.e.
>> > explicitly ask for a specific set of partitions to be assigned to the
>> > container, instead of default to whatever broker decides to assign to
>> the
>> > container)
>> >
>> > {quote}
>> > a question about the proposed API for instantiating a job in code
>> (rather
>> > than a properties file): when submitting a job to a cluster, is the idea
>> > that the instantiation code runs on a client somewhere, which then pokes
>> > the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code run on
>> > each container that is part of the job (in which case, how does the job
>> > submission to the cluster work)?
>> > {quote}
>> > I want to chime in here to propose the three layered model:
>> > 1. Samza as a library. In this case, the client application should
>> actually
>> > implement the interface to submit its own job to a cluster, if the
>> client
>> > application chooses to run in a cluster. Samza as a library won't be
>> poking
>> > any endpoints on YARN/Mesos/AWS
>> > 2. Samza as a process. In this case, a properties file could be desired
>> > when starting the Samza process. Here again, Samza as a process should
>> not
>> > need to interact w/ any endpoints on YARN/Mesos/AWS.
>> > 3. Samza as a service. In this case, we definitely need some job
>> > configuration and a Samza implementation of interface to submit jobs and
>> > their configuration to a cluster (i.e. YARN/Mesos/AWS, mostly the same
>> as
>> > we have today, except that we should just ask for resources to run Samza
>> > containers and leave the partition management aside)
>> >
>> > Thanks!
>> >
>> > On Mon, Jul 6, 2015 at 3:37 AM, Martin Kleppmann <mar...@kleppmann.com>
>> > wrote:
>> >
>> > > Hi all,
>> > >
>> > > Lots of good thoughts here.
>> > >
>> > > I agree with the general philosophy of tying Samza more firmly to
>> Kafka.
>> > > After I spent a while looking at integrating other message brokers
>> (e.g.
>> > > Kinesis) with SystemConsumer, I came to the conclusion that
>> > SystemConsumer
>> > > tacitly assumes a model so much like Kafka's that pretty much nobody
>> but
>> > > Kafka actually implements it. (Databus is perhaps an exception, but it
>> > > isn't widely used outside of LinkedIn.) Thus, making Samza fully
>> > dependent
>> > > on Kafka acknowledges that the system-independence was never as real
>> as
>> > we
>> > > perhaps made it out to be. The gains of code reuse are real.
>> > >
>> > > The idea of decoupling Samza from YARN has also always been appealing
>> to
>> > > me, for various reasons already mentioned in this thread. Although
>> making
>> > > Samza jobs deployable on anything (YARN/Mesos/AWS/etc) seems
>> laudable, I
>> > am
>> > > a little concerned that it will restrict us to a lowest common
>> > denominator.
>> > > For example, would host affinity (SAMZA-617) still be possible? For
>> jobs
>> > > with large amounts of state, I think SAMZA-617 would be a big boon,
>> since
>> > > restoring state off the changelog on every single restart is painful,
>> due
>> > > to long recovery times. It would be a shame if the decoupling from
>> YARN
>> > > made host affinity impossible.
>> > >
>> > > Jay, a question about the proposed API for instantiating a job in code
>> > > (rather than a properties file): when submitting a job to a cluster,
>> is
>> > the
>> > > idea that the instantiation code runs on a client somewhere, which
>> then
>> > > pokes the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code
>> > run
>> > > on each container that is part of the job (in which case, how does the
>> > job
>> > > submission to the cluster work)?
>> > >
>> > > I agree with Garry that it doesn't feel right to make a 1.0 release
>> with
>> > a
>> > > plan for it to be immediately obsolete. So if this is going to
>> happen, I
>> > > think it would be more honest to stick with 0.* version numbers until
>> the
>> > > library-ified Samza has been implemented, is stable and widely used.
>> > >
>> > > Should the new Samza be a subproject of Kafka? There is precedent for
>> > > tight coupling between different Apache projects (e.g. Curator and
>> > > Zookeeper, or Slider and YARN), so I think remaining separate would be
>> > ok.
>> > > Even if Samza is fully dependent on Kafka, there is enough substance
>> in
>> > > Samza that it warrants being a separate project. An argument in
>> favour of
>> > > merging would be if we think Kafka has a much stronger "brand
>> presence"
>> > > than Samza; I'm ambivalent on that one. If the Kafka project is
>> willing
>> > to
>> > > endorse Samza as the "official" way of doing stateful stream
>> > > transformations, that would probably have much the same effect as
>> > > re-branding Samza as "Kafka Stream Processors" or suchlike. Close
>> > > collaboration between the two projects will be needed in any case.
>> > >
>> > > From a project management perspective, I guess the "new Samza" would
>> have
>> > > to be developed on a branch alongside ongoing maintenance of the
>> current
>> > > line of development? I think it would be important to continue
>> supporting
>> > > existing users, and provide a graceful migration path to the new
>> version.
>> > > Leaving the current versions unsupported and forcing people to rewrite
>> > > their jobs would send a bad signal.
>> > >
>> > > Best,
>> > > Martin
>> > >
>> > > On 2 Jul 2015, at 16:59, Jay Kreps <j...@confluent.io> wrote:
>> > >
>> > > > Hey Garry,
>> > > >
>> > > > Yeah that's super frustrating. I'd be happy to chat more about this
>> if
>> > > > you'd be interested. I think Chris and I started with the idea of
>> "what
>> > > > would it take to make Samza a kick-ass ingestion tool" but
>> ultimately
>> > we
>> > > > kind of came around to the idea that ingestion and transformation
>> had
>> > > > pretty different needs and coupling the two made things hard.
>> > > >
>> > > > For what it's worth I think copycat (KIP-26) actually will do what
>> you
>> > > are
>> > > > looking for.
>> > > >
>> > > > With regard to your point about slider, I don't necessarily
>> disagree.
>> > > But I
>> > > > think getting good YARN support is quite doable and I think we can
>> make
>> > > > that work well. I think the issue this proposal solves is that
>> > > technically
>> > > > it is pretty hard to support multiple cluster management systems the
>> > way
>> > > > things are now, you need to write an "app master" or "framework" for
>> > each
>> > > > and they are all a little different so testing is really hard. In
>> the
>> > > > absence of this we have been stuck with just YARN which has
>> fantastic
>> > > > penetration in the Hadoopy part of the org, but zero penetration
>> > > elsewhere.
>> > > > Given the huge amount of work being put in to slider, marathon, aws
>> > > > tooling, not to mention the umpteen related packaging technologies
>> > people
>> > > > want to use (Docker, Kubernetes, various cloud-specific deploy
>> tools,
>> > > etc)
>> > > > I really think it is important to get this right.
>> > > >
>> > > > -Jay
>> > > >
>> > > > On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington <
>> > > > g.turking...@improvedigital.com> wrote:
>> > > >
>> > > >> Hi all,
>> > > >>
>> > > >> I think the question below re does Samza become a sub-project of
>> Kafka
>> > > >> highlights the broader point around migration. Chris mentions
>> Samza's
>> > > >> maturity is heading towards a v1 release but I'm not sure it feels
>> > > right to
>> > > >> launch a v1 then immediately plan to deprecate most of it.
>> > > >>
>> > > >> From a selfish perspective I have some guys who have started
>> working
>> > > with
>> > > >> Samza and building some new consumers/producers was next up. Sounds
>> > like
>> > > >> that is absolutely not the direction to go. I need to look into the
>> > KIP
>> > > in
>> > > >> more detail but for me the attractiveness of adding new Samza
>> > > >> consumer/producers -- even if yes all they were doing was really
>> > getting
>> > > >> data into and out of Kafka --  was to avoid  having to worry about
>> the
>> > > >> lifecycle management of external clients. If there is a generic
>> Kafka
>> > > >> ingress/egress layer that I can plug a new connector into and have
>> a
>> > > lot of
>> > > >> the heavy lifting re scale and reliability done for me then it
>> gives
>> > me
>> > > all
>> > > >> the pushing new consumers/producers would. If not then it
>> complicates
>> > my
>> > > >> operational deployments.
>> > > >>
>> > > >> Which is similar to my other question with the proposal -- if we
>> > build a
>> > > >> fully available/stand-alone Samza plus the requisite shims to
>> > integrate
>> > > >> with Slider etc I suspect the former may be a lot more work than we
>> > > think.
>> > > >> We may make it much easier for a newcomer to get something running
>> but
>> > > >> having them step up and get a reliable production deployment may
>> still
>> > > >> dominate mailing list  traffic, if for different reasons than
>> today.
>> > > >>
>> > > >> Don't get me wrong -- I'm comfortable with making the Samza
>> dependency
>> > > on
>> > > >> Kafka much more explicit and I absolutely see the benefits  in the
>> > > >> reduction of duplication and clashing terminologies/abstractions
>> that
>> > > >> Chris/Jay describe. Samza as a library would likely be a very nice
>> > tool
>> > > to
>> > > >> add to the Kafka ecosystem. I just have the concerns above re the
>> > > >> operational side.
>> > > >>
>> > > >> Garry
>> > > >>
>> > > >> -----Original Message-----
>> > > >> From: Gianmarco De Francisci Morales [mailto:g...@apache.org]
>> > > >> Sent: 02 July 2015 12:56
>> > > >> To: dev@samza.apache.org
>> > > >> Subject: Re: Thoughts and obesrvations on Samza
>> > > >>
>> > > >> Very interesting thoughts.
>> > > >> From outside, I have always perceived Samza as a computing layer
>> over
>> > > >> Kafka.
>> > > >>
>> > > >> The question, maybe a bit provocative, is "should Samza be a
>> > sub-project
>> > > >> of Kafka then?"
>> > > >> Or does it make sense to keep it as a separate project with a
>> separate
>> > > >> governance?
>> > > >>
>> > > >> Cheers,
>> > > >>
>> > > >> --
>> > > >> Gianmarco
>> > > >>
>> > > >> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote:
>> > > >>
>> > > >>> Overall, I agree to couple with Kafka more tightly. Because Samza
>> de
>> > > >>> facto is based on Kafka, and it should leverage what Kafka has. At
>> > the
>> > > >>> same time, Kafka does not need to reinvent what Samza already
>> has. I
>> > > >>> also like the idea of separating the ingestion and transformation.
>> > > >>>
>> > > >>> But it is a little difficult for me to image how the Samza will
>> look
>> > > >> like.
>> > > >>> And I feel Chris and Jay have a little difference in terms of how
>> > > >>> Samza should look like.
>> > > >>>
>> > > >>> *** Will it look like what Jay's code shows (A client of Kakfa) ?
>> And
>> > > >>> user's application code calls this client?
>> > > >>>
>> > > >>> 1. If we make Samza be a library of Kafka (like what the code
>> shows),
>> > > >>> how do we implement auto-balance and fault-tolerance? Are they
>> taken
>> > > >>> care by the Kafka broker or other mechanism, such as "Samza
>> worker"
>> > > >>> (just make up the name) ?
>> > > >>>
>> > > >>> 2. What about other features, such as auto-scaling, shared state,
>> > > >>> monitoring?
>> > > >>>
>> > > >>>
>> > > >>> *** If we have Samza standalone, (is this what Chris suggests?)
>> > > >>>
>> > > >>> 1. we still need to ingest data from Kakfa and produce to it.
>> Then it
>> > > >>> becomes the same as what Samza looks like now, except it does not
>> > rely
>> > > >>> on Yarn anymore.
>> > > >>>
>> > > >>> 2. if it is standalone, how can it leverage Kafka's metrics, logs,
>> > > >>> etc? Use Kafka code as the dependency?
>> > > >>>
>> > > >>>
>> > > >>> Thanks,
>> > > >>>
>> > > >>> Fang, Yan
>> > > >>> yanfang...@gmail.com
>> > > >>>
>> > > >>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com
>> >
>> > > >> wrote:
>> > > >>>
>> > > >>>> Read through the code example and it looks good to me. A few
>> > > >>>> thoughts regarding deployment:
>> > > >>>>
>> > > >>>> Today Samza deploys as executable runnable like:
>> > > >>>>
>> > > >>>> deploy/samza/bin/run-job.sh --config-factory=...
>> > > >> --config-path=file://...
>> > > >>>>
>> > > >>>> And this proposal advocate for deploying Samza more as embedded
>> > > >>>> libraries in user application code (ignoring the terminology
>> since
>> > > >>>> it is not the
>> > > >>> same
>> > > >>>> as the prototype code):
>> > > >>>>
>> > > >>>> StreamTask task = new MyStreamTask(configs); Thread thread = new
>> > > >>>> Thread(task); thread.start();
>> > > >>>>
>> > > >>>> I think both of these deployment modes are important for
>> different
>> > > >>>> types
>> > > >>> of
>> > > >>>> users. That said, I think making Samza purely standalone is still
>> > > >>>> sufficient for either runnable or library modes.
>> > > >>>>
>> > > >>>> Guozhang
>> > > >>>>
>> > > >>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io>
>> > wrote:
>> > > >>>>
>> > > >>>>> Looks like gmail mangled the code example, it was supposed to
>> look
>> > > >>>>> like
>> > > >>>>> this:
>> > > >>>>>
>> > > >>>>> Properties props = new Properties();
>> > > >>>>> props.put("bootstrap.servers", "localhost:4242");
>> StreamingConfig
>> > > >>>>> config = new StreamingConfig(props);
>> > > >>>>> config.subscribe("test-topic-1", "test-topic-2");
>> > > >>>>> config.processor(ExampleStreamProcessor.class);
>> > > >>>>> config.serialization(new StringSerializer(), new
>> > > >>>>> StringDeserializer()); KafkaStreaming container = new
>> > > >>>>> KafkaStreaming(config); container.run();
>> > > >>>>>
>> > > >>>>> -Jay
>> > > >>>>>
>> > > >>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io>
>> > > >> wrote:
>> > > >>>>>
>> > > >>>>>> Hey guys,
>> > > >>>>>>
>> > > >>>>>> This came out of some conversations Chris and I were having
>> > > >>>>>> around
>> > > >>>>> whether
>> > > >>>>>> it would make sense to use Samza as a kind of data ingestion
>> > > >>> framework
>> > > >>>>> for
>> > > >>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of
>> > > >>>> combined
>> > > >>>>>> with complaints around config and YARN and the discussion
>> around
>> > > >>>>>> how
>> > > >>> to
>> > > >>>>>> best do a standalone mode.
>> > > >>>>>>
>> > > >>>>>> So the thought experiment was, given that Samza was basically
>> > > >>>>>> already totally Kafka specific, what if you just embraced that
>> > > >>>>>> and turned it
>> > > >>>> into
>> > > >>>>>> something less like a heavyweight framework and more like a
>> > > >>>>>> third
>> > > >>> Kafka
>> > > >>>>>> client--a kind of "producing consumer" with state management
>> > > >>>> facilities.
>> > > >>>>>> Basically a library. Instead of a complex stream processing
>> > > >>>>>> framework
>> > > >>>>> this
>> > > >>>>>> would actually be a very simple thing, not much more
>> complicated
>> > > >>>>>> to
>> > > >>> use
>> > > >>>>> or
>> > > >>>>>> operate than a Kafka consumer. As Chris said we thought about
>> it
>> > > >>>>>> a
>> > > >>> lot
>> > > >>>> of
>> > > >>>>>> what Samza (and the other stream processing systems were doing)
>> > > >>> seemed
>> > > >>>>> like
>> > > >>>>>> kind of a hangover from MapReduce.
>> > > >>>>>>
>> > > >>>>>> Of course you need to ingest/output data to and from the stream
>> > > >>>>>> processing. But when we actually looked into how that would
>> > > >>>>>> work,
>> > > >>> Samza
>> > > >>>>>> isn't really an ideal data ingestion framework for a bunch of
>> > > >>> reasons.
>> > > >>>> To
>> > > >>>>>> really do that right you need a pretty different internal data
>> > > >>>>>> model
>> > > >>>> and
>> > > >>>>>> set of apis. So what if you split them and had an api for Kafka
>> > > >>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for
>> Kafka
>> > > >>>>>> transformation (Samza).
>> > > >>>>>>
>> > > >>>>>> This would also allow really embracing the same terminology and
>> > > >>>>>> conventions. One complaint about the current state is that the
>> > > >>>>>> two
>> > > >>>>> systems
>> > > >>>>>> kind of feel bolted on. Terminology like "stream" vs "topic"
>> and
>> > > >>>>> different
>> > > >>>>>> config and monitoring systems means you kind of have to learn
>> > > >>>>>> Kafka's
>> > > >>>>> way,
>> > > >>>>>> then learn Samza's slightly different way, then kind of
>> > > >>>>>> understand
>> > > >>> how
>> > > >>>>> they
>> > > >>>>>> map to each other, which having walked a few people through
>> this
>> > > >>>>>> is surprisingly tricky for folks to get.
>> > > >>>>>>
>> > > >>>>>> Since I have been spending a lot of time on airplanes I hacked
>> > > >>>>>> up an ernest but still somewhat incomplete prototype of what
>> > > >>>>>> this would
>> > > >>> look
>> > > >>>>>> like. This is just unceremoniously dumped into Kafka as it
>> > > >>>>>> required a
>> > > >>>> few
>> > > >>>>>> changes to the new consumer. Here is the code:
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org
>> > > >>> /apache/kafka/clients/streaming
>> > > >>>>>>
>> > > >>>>>> For the purpose of the prototype I just liberally renamed
>> > > >>>>>> everything
>> > > >>> to
>> > > >>>>>> try to align it with Kafka with no regard for compatibility.
>> > > >>>>>>
>> > > >>>>>> To use this would be something like this:
>> > > >>>>>> Properties props = new Properties();
>> > > >>>>>> props.put("bootstrap.servers", "localhost:4242");
>> > > >>>>>> StreamingConfig config = new
>> > > >>> StreamingConfig(props);
>> > > >>>>> config.subscribe("test-topic-1",
>> > > >>>>>> "test-topic-2");
>> config.processor(ExampleStreamProcessor.class);
>> > > >>>>> config.serialization(new
>> > > >>>>>> StringSerializer(), new StringDeserializer()); KafkaStreaming
>> > > >>>> container =
>> > > >>>>>> new KafkaStreaming(config); container.run();
>> > > >>>>>>
>> > > >>>>>> KafkaStreaming is basically the SamzaContainer; StreamProcessor
>> > > >>>>>> is basically StreamTask.
>> > > >>>>>>
>> > > >>>>>> So rather than putting all the class names in a file and then
>> > > >>>>>> having
>> > > >>>> the
>> > > >>>>>> job assembled by reflection, you just instantiate the container
>> > > >>>>>> programmatically. Work is balanced over however many instances
>> > > >>>>>> of
>> > > >>> this
>> > > >>>>> are
>> > > >>>>>> alive at any time (i.e. if an instance dies, new tasks are
>> added
>> > > >>>>>> to
>> > > >>> the
>> > > >>>>>> existing containers without shutting them down).
>> > > >>>>>>
>> > > >>>>>> We would provide some glue for running this stuff in YARN via
>> > > >>>>>> Slider, Mesos via Marathon, and AWS using some of their tools
>> > > >>>>>> but from the
>> > > >>>> point
>> > > >>>>> of
>> > > >>>>>> view of these frameworks these stream processing jobs are just
>> > > >>>> stateless
>> > > >>>>>> services that can come and go and expand and contract at will.
>> > > >>>>>> There
>> > > >>> is
>> > > >>>>> no
>> > > >>>>>> more custom scheduler.
>> > > >>>>>>
>> > > >>>>>> Here are some relevant details:
>> > > >>>>>>
>> > > >>>>>>   1. It is only ~1300 lines of code, it would get larger if we
>> > > >>>>>>   productionized but not vastly larger. We really do get a ton
>> > > >>>>>> of
>> > > >>>>> leverage
>> > > >>>>>>   out of Kafka.
>> > > >>>>>>   2. Partition management is fully delegated to the new
>> consumer.
>> > > >>> This
>> > > >>>>>>   is nice since now any partition management strategy available
>> > > >>>>>> to
>> > > >>>> Kafka
>> > > >>>>>>   consumer is also available to Samza (and vice versa) and with
>> > > >>>>>> the
>> > > >>>>> exact
>> > > >>>>>>   same configs.
>> > > >>>>>>   3. It supports state as well as state reuse
>> > > >>>>>>
>> > > >>>>>> Anyhow take a look, hopefully it is thought provoking.
>> > > >>>>>>
>> > > >>>>>> -Jay
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini <
>> > > >>>> criccom...@apache.org>
>> > > >>>>>> wrote:
>> > > >>>>>>
>> > > >>>>>>> Hey all,
>> > > >>>>>>>
>> > > >>>>>>> I have had some discussions with Samza engineers at LinkedIn
>> > > >>>>>>> and
>> > > >>>>> Confluent
>> > > >>>>>>> and we came up with a few observations and would like to
>> > > >>>>>>> propose
>> > > >>> some
>> > > >>>>>>> changes.
>> > > >>>>>>>
>> > > >>>>>>> We've observed some things that I want to call out about
>> > > >>>>>>> Samza's
>> > > >>>> design,
>> > > >>>>>>> and I'd like to propose some changes.
>> > > >>>>>>>
>> > > >>>>>>> * Samza is dependent upon a dynamic deployment system.
>> > > >>>>>>> * Samza is too pluggable.
>> > > >>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer
>> > > >>>>>>> APIs
>> > > >>> are
>> > > >>>>>>> trying to solve a lot of the same problems.
>> > > >>>>>>>
>> > > >>>>>>> All three of these issues are related, but I'll address them
>> in
>> > > >>> order.
>> > > >>>>>>>
>> > > >>>>>>> Deployment
>> > > >>>>>>>
>> > > >>>>>>> Samza strongly depends on the use of a dynamic deployment
>> > > >>>>>>> scheduler
>> > > >>>> such
>> > > >>>>>>> as
>> > > >>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet that
>> > > >>>>>>> there
>> > > >>>> would
>> > > >>>>>>> be
>> > > >>>>>>> one or two winners in this area, and we could support them,
>> and
>> > > >>>>>>> the
>> > > >>>> rest
>> > > >>>>>>> would go away. In reality, there are many variations.
>> > > >>>>>>> Furthermore,
>> > > >>>> many
>> > > >>>>>>> people still prefer to just start their processors like normal
>> > > >>>>>>> Java processes, and use traditional deployment scripts such as
>> > > >>>>>>> Fabric,
>> > > >>>> Chef,
>> > > >>>>>>> Ansible, etc. Forcing a deployment system on users makes the
>> > > >>>>>>> Samza start-up process really painful for first time users.
>> > > >>>>>>>
>> > > >>>>>>> Dynamic deployment as a requirement was also a bit of a
>> > > >>>>>>> mis-fire
>> > > >>>> because
>> > > >>>>>>> of
>> > > >>>>>>> a fundamental misunderstanding between the nature of batch
>> jobs
>> > > >>>>>>> and
>> > > >>>>> stream
>> > > >>>>>>> processing jobs. Early on, we made conscious effort to favor
>> > > >>>>>>> the
>> > > >>>> Hadoop
>> > > >>>>>>> (Map/Reduce) way of doing things, since it worked and was well
>> > > >>>>> understood.
>> > > >>>>>>> One thing that we missed was that batch jobs have a definite
>> > > >>>> beginning,
>> > > >>>>>>> and
>> > > >>>>>>> end, and stream processing jobs don't (usually). This leads to
>> > > >>>>>>> a
>> > > >>> much
>> > > >>>>>>> simpler scheduling problem for stream processors. You
>> basically
>> > > >>>>>>> just
>> > > >>>>> need
>> > > >>>>>>> to find a place to start the processor, and start it. The way
>> > > >>>>>>> we run grids, at LinkedIn, there's no concept of a cluster
>> > > >>>>>>> being "full". We always
>> > > >>>> add
>> > > >>>>>>> more machines. The problem with coupling Samza with a
>> scheduler
>> > > >>>>>>> is
>> > > >>>> that
>> > > >>>>>>> Samza (as a framework) now has to handle deployment. This
>> pulls
>> > > >>>>>>> in a
>> > > >>>>> bunch
>> > > >>>>>>> of things such as configuration distribution (config stream),
>> > > >>>>>>> shell
>> > > >>>>> scrips
>> > > >>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff),
>> etc.
>> > > >>>>>>>
>> > > >>>>>>> Another reason for requiring dynamic deployment was to support
>> > > >>>>>>> data locality. If you want to have locality, you need to put
>> > > >>>>>>> your
>> > > >>>> processors
>> > > >>>>>>> close to the data they're processing. Upon further
>> > > >>>>>>> investigation,
>> > > >>>>> though,
>> > > >>>>>>> this feature is not that beneficial. There is some good
>> > > >>>>>>> discussion
>> > > >>>> about
>> > > >>>>>>> some problems with it on SAMZA-335. Again, we took the
>> > > >>>>>>> Map/Reduce
>> > > >>>> path,
>> > > >>>>>>> but
>> > > >>>>>>> there are some fundamental differences between HDFS and Kafka.
>> > > >>>>>>> HDFS
>> > > >>>> has
>> > > >>>>>>> blocks, while Kafka has partitions. This leads to less
>> > > >>>>>>> optimization potential with stream processors on top of Kafka.
>> > > >>>>>>>
>> > > >>>>>>> This feature is also used as a crutch. Samza doesn't have any
>> > > >>>>>>> built
>> > > >>> in
>> > > >>>>>>> fault-tolerance logic. Instead, it depends on the dynamic
>> > > >>>>>>> deployment scheduling system to handle restarts when a
>> > > >>>>>>> processor dies. This has
>> > > >>>>> made
>> > > >>>>>>> it very difficult to write a standalone Samza container
>> > > >> (SAMZA-516).
>> > > >>>>>>>
>> > > >>>>>>> Pluggability
>> > > >>>>>>>
>> > > >>>>>>> In some cases pluggability is good, but I think that we've
>> gone
>> > > >>>>>>> too
>> > > >>>> far
>> > > >>>>>>> with it. Currently, Samza has:
>> > > >>>>>>>
>> > > >>>>>>> * Pluggable config.
>> > > >>>>>>> * Pluggable metrics.
>> > > >>>>>>> * Pluggable deployment systems.
>> > > >>>>>>> * Pluggable streaming systems (SystemConsumer, SystemProducer,
>> > > >> etc).
>> > > >>>>>>> * Pluggable serdes.
>> > > >>>>>>> * Pluggable storage engines.
>> > > >>>>>>> * Pluggable strategies for just about every component
>> > > >>> (MessageChooser,
>> > > >>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc).
>> > > >>>>>>>
>> > > >>>>>>> There's probably more that I've forgotten, as well. Some of
>> > > >>>>>>> these
>> > > >>> are
>> > > >>>>>>> useful, but some have proven not to be. This all comes at a
>> cost:
>> > > >>>>>>> complexity. This complexity is making it harder for our users
>> > > >>>>>>> to
>> > > >>> pick
>> > > >>>> up
>> > > >>>>>>> and use Samza out of the box. It also makes it difficult for
>> > > >>>>>>> Samza developers to reason about what the characteristics of
>> > > >>>>>>> the container (since the characteristics change depending on
>> > > >>>>>>> which plugins are use).
>> > > >>>>>>>
>> > > >>>>>>> The issues with pluggability are most visible in the System
>> APIs.
>> > > >>> What
>> > > >>>>>>> Samza really requires to be functional is Kafka as its
>> > > >>>>>>> transport
>> > > >>>> layer.
>> > > >>>>>>> But
>> > > >>>>>>> we've conflated two unrelated use cases into one API:
>> > > >>>>>>>
>> > > >>>>>>> 1. Get data into/out of Kafka.
>> > > >>>>>>> 2. Process the data in Kafka.
>> > > >>>>>>>
>> > > >>>>>>> The current System API supports both of these use cases. The
>> > > >>>>>>> problem
>> > > >>>> is,
>> > > >>>>>>> we
>> > > >>>>>>> actually want different features for each use case. By
>> papering
>> > > >>>>>>> over
>> > > >>>>> these
>> > > >>>>>>> two use cases, and providing a single API, we've introduced a
>> > > >>>>>>> ton of
>> > > >>>>> leaky
>> > > >>>>>>> abstractions.
>> > > >>>>>>>
>> > > >>>>>>> For example, what we'd really like in (2) is to have
>> > > >>>>>>> monotonically increasing longs for offsets (like Kafka). This
>> > > >>>>>>> would be at odds
>> > > >>> with
>> > > >>>>> (1),
>> > > >>>>>>> though, since different systems have different
>> > > >>>>> SCNs/Offsets/UUIDs/vectors.
>> > > >>>>>>> There was discussion both on the mailing list and the SQL
>> JIRAs
>> > > >>> about
>> > > >>>>> the
>> > > >>>>>>> need for this.
>> > > >>>>>>>
>> > > >>>>>>> The same thing holds true for replayability. Kafka allows us
>> to
>> > > >>> rewind
>> > > >>>>>>> when
>> > > >>>>>>> we have a failure. Many other systems don't. In some cases,
>> > > >>>>>>> systems
>> > > >>>>> return
>> > > >>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) because
>> > > >>>>>>> they
>> > > >>>> have
>> > > >>>>> no
>> > > >>>>>>> offsets.
>> > > >>>>>>>
>> > > >>>>>>> Partitioning is another example. Kafka supports partitioning,
>> > > >>>>>>> but
>> > > >>> many
>> > > >>>>>>> systems don't. We model this by having a single partition for
>> > > >>>>>>> those systems. Still, other systems model partitioning
>> > > >> differently (e.g.
>> > > >>>>>>> Kinesis).
>> > > >>>>>>>
>> > > >>>>>>> The SystemAdmin interface is also a mess. Creating streams in
>> a
>> > > >>>>>>> system-agnostic way is almost impossible. As is modeling
>> > > >>>>>>> metadata
>> > > >>> for
>> > > >>>>> the
>> > > >>>>>>> system (replication factor, partitions, location, etc). The
>> > > >>>>>>> list
>> > > >>> goes
>> > > >>>>> on.
>> > > >>>>>>>
>> > > >>>>>>> Duplicate work
>> > > >>>>>>>
>> > > >>>>>>> At the time that we began writing Samza, Kafka's consumer and
>> > > >>> producer
>> > > >>>>>>> APIs
>> > > >>>>>>> had a relatively weak feature set. On the consumer-side, you
>> > > >>>>>>> had two
>> > > >>>>>>> options: use the high level consumer, or the simple consumer.
>> > > >>>>>>> The
>> > > >>>>> problem
>> > > >>>>>>> with the high-level consumer was that it controlled your
>> > > >>>>>>> offsets, partition assignments, and the order in which you
>> > > >>>>>>> received messages. The
>> > > >>> problem
>> > > >>>>>>> with
>> > > >>>>>>> the simple consumer is that it's not simple. It's basic. You
>> > > >>>>>>> end up
>> > > >>>>> having
>> > > >>>>>>> to handle a lot of really low-level stuff that you shouldn't.
>> > > >>>>>>> We
>> > > >>>> spent a
>> > > >>>>>>> lot of time to make Samza's KafkaSystemConsumer very robust.
>> It
>> > > >>>>>>> also allows us to support some cool features:
>> > > >>>>>>>
>> > > >>>>>>> * Per-partition message ordering and prioritization.
>> > > >>>>>>> * Tight control over partition assignment to support joins,
>> > > >>>>>>> global
>> > > >>>> state
>> > > >>>>>>> (if we want to implement it :)), etc.
>> > > >>>>>>> * Tight control over offset checkpointing.
>> > > >>>>>>>
>> > > >>>>>>> What we didn't realize at the time is that these features
>> > > >>>>>>> should
>> > > >>>>> actually
>> > > >>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza stream
>> > > >>>> processors)
>> > > >>>>>>> end up wanting to do things like joins and partition
>> > > >>>>>>> assignment. The
>> > > >>>>> Kafka
>> > > >>>>>>> community has come to the same conclusion. They're adding a
>> ton
>> > > >>>>>>> of upgrades into their new Kafka consumer implementation. To a
>> > > >>>>>>> large extent,
>> > > >>> it's
>> > > >>>>>>> duplicate work to what we've already done in Samza.
>> > > >>>>>>>
>> > > >>>>>>> On top of this, Kafka ended up taking a very similar approach
>> > > >>>>>>> to
>> > > >>>> Samza's
>> > > >>>>>>> KafkaCheckpointManager implementation for handling offset
>> > > >>>> checkpointing.
>> > > >>>>>>> Like Samza, Kafka's new offset management feature stores
>> offset
>> > > >>>>>>> checkpoints in a topic, and allows you to fetch them from the
>> > > >>>>>>> broker.
>> > > >>>>>>>
>> > > >>>>>>> A lot of this seems like a waste, since we could have shared
>> > > >>>>>>> the
>> > > >>> work
>> > > >>>> if
>> > > >>>>>>> it
>> > > >>>>>>> had been done in Kafka from the get-go.
>> > > >>>>>>>
>> > > >>>>>>> Vision
>> > > >>>>>>>
>> > > >>>>>>> All of this leads me to a rather radical proposal. Samza is
>> > > >>> relatively
>> > > >>>>>>> stable at this point. I'd venture to say that we're near a 1.0
>> > > >>>> release.
>> > > >>>>>>> I'd
>> > > >>>>>>> like to propose that we take what we've learned, and begin
>> > > >>>>>>> thinking
>> > > >>>>> about
>> > > >>>>>>> Samza beyond 1.0. What would we change if we were starting
>> from
>> > > >>>> scratch?
>> > > >>>>>>> My
>> > > >>>>>>> proposal is to:
>> > > >>>>>>>
>> > > >>>>>>> 1. Make Samza standalone the *only* way to run Samza
>> > > >>>>>>> processors, and eliminate all direct dependences on YARN,
>> Mesos,
>> > > >> etc.
>> > > >>>>>>> 2. Make a definitive call to support only Kafka as the stream
>> > > >>>> processing
>> > > >>>>>>> layer.
>> > > >>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and
>> > > >>>>>>> config
>> > > >>>>> systems,
>> > > >>>>>>> and simply use Kafka's instead.
>> > > >>>>>>>
>> > > >>>>>>> This would fix all of the issues that I outlined above. It
>> > > >>>>>>> should
>> > > >>> also
>> > > >>>>>>> shrink the Samza code base pretty dramatically. Supporting
>> only
>> > > >>>>>>> a standalone container will allow Samza to be executed on YARN
>> > > >>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most other
>> > > >>>>>>> in-house
>> > > >>>>> deployment
>> > > >>>>>>> systems. This should make life a lot easier for new users.
>> > > >>>>>>> Imagine
>> > > >>>>> having
>> > > >>>>>>> the hello-samza tutorial without YARN. The drop in mailing
>> list
>> > > >>>> traffic
>> > > >>>>>>> will be pretty dramatic.
>> > > >>>>>>>
>> > > >>>>>>> Coupling with Kafka seems long overdue to me. The reality is,
>> > > >>> everyone
>> > > >>>>>>> that
>> > > >>>>>>> I'm aware of is using Samza with Kafka. We basically require
>> it
>> > > >>>> already
>> > > >>>>> in
>> > > >>>>>>> order for most features to work. Those that are using other
>> > > >>>>>>> systems
>> > > >>>> are
>> > > >>>>>>> generally using it for ingest into Kafka (1), and then they do
>> > > >>>>>>> the processing on top. There is already discussion (
>> > > >>>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851
>> > > >>> 767
>> > > >>>>>>> )
>> > > >>>>>>> in Kafka to make ingesting into Kafka extremely easy.
>> > > >>>>>>>
>> > > >>>>>>> Once we make the call to couple with Kafka, we can leverage a
>> > > >>>>>>> ton of
>> > > >>>>> their
>> > > >>>>>>> ecosystem. We no longer have to maintain our own config,
>> > > >>>>>>> metrics,
>> > > >>> etc.
>> > > >>>>> We
>> > > >>>>>>> can all share the same libraries, and make them better. This
>> > > >>>>>>> will
>> > > >>> also
>> > > >>>>>>> allow us to share the consumer/producer APIs, and will let us
>> > > >>> leverage
>> > > >>>>>>> their offset management and partition management, rather than
>> > > >>>>>>> having
>> > > >>>> our
>> > > >>>>>>> own. All of the coordinator stream code would go away, as
>> would
>> > > >>>>>>> most
>> > > >>>> of
>> > > >>>>>>> the
>> > > >>>>>>> YARN AppMaster code. We'd probably have to push some partition
>> > > >>>>> management
>> > > >>>>>>> features into the Kafka broker, but they're already moving in
>> > > >>>>>>> that direction with the new consumer API. The features we have
>> > > >>>>>>> for
>> > > >>>> partition
>> > > >>>>>>> assignment aren't unique to Samza, and seem like they should
>> be
>> > > >>>>>>> in
>> > > >>>> Kafka
>> > > >>>>>>> anyway. There will always be some niche usages which will
>> > > >>>>>>> require
>> > > >>>> extra
>> > > >>>>>>> care and hence full control over partition assignments much
>> > > >>>>>>> like the
>> > > >>>>> Kafka
>> > > >>>>>>> low level consumer api. These would continue to be supported.
>> > > >>>>>>>
>> > > >>>>>>> These items will be good for the Samza community. They'll make
>> > > >>>>>>> Samza easier to use, and make it easier for developers to add
>> > > >>>>>>> new features.
>> > > >>>>>>>
>> > > >>>>>>> Obviously this is a fairly large (and somewhat backwards
>> > > >>> incompatible
>> > > >>>>>>> change). If we choose to go this route, it's important that we
>> > > >>> openly
>> > > >>>>>>> communicate how we're going to provide a migration path from
>> > > >>>>>>> the
>> > > >>>>> existing
>> > > >>>>>>> APIs to the new ones (if we make incompatible changes). I
>> think
>> > > >>>>>>> at a minimum, we'd probably need to provide a wrapper to allow
>> > > >>>>>>> existing StreamTask implementations to continue running on the
>> > > >> new container.
>> > > >>>>> It's
>> > > >>>>>>> also important that we openly communicate about timing, and
>> > > >>>>>>> stages
>> > > >>> of
>> > > >>>>> the
>> > > >>>>>>> migration.
>> > > >>>>>>>
>> > > >>>>>>> If you made it this far, I'm sure you have opinions. :) Please
>> > > >>>>>>> send
>> > > >>>> your
>> > > >>>>>>> thoughts and feedback.
>> > > >>>>>>>
>> > > >>>>>>> Cheers,
>> > > >>>>>>> Chris
>> > > >>>>>>>
>> > > >>>>>>
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>>
>> > > >>>>
>> > > >>>> --
>> > > >>>> -- Guozhang
>> > > >>>>
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to