{quote} There is a good chance that the different execution frameworks can be abstract out {quote} Actually, I think that even we define an abstract layer, it won't need to have Samza process be aware of the existence of the abstract layer. The Samza container running as a process can be totally unaware of the execution framework it uses. The job submission/configuration/launching tools can be completely isolated from the Samza container as a process, ideally.
On Mon, Jul 6, 2015 at 3:13 PM, Yi Pan <nickpa...@gmail.com> wrote: > @Jay, you got my point. > > {quote} > I think the question is whether for the "as a > service" you are proposing actually trying to build some layer over > YARN/Mesos/AWS that abstracts these away? > {quote} > I am not very strong on this, but I do see that as an option. The reason I > had is: if all we need from YARN/Mesos/AWS is simply: > 1. a set of resource to run Samza containers > 2. launch Samza containers on the chosen set of resource > 3. make the container processes fault tolerant (i.e. monitor and restart > the failed processes) > There is a good chance that the different execution frameworks can be > abstract out since all cluster management systems would need to provide the > above set of functionalities. I would need to spend more time on the Mesos > patch for Samza to come up w/ more concrete idea on the abstract layer. If > it turns out that the abstraction is not possible, Samza as a service may > have to be implemented in many versions on top of Samza as a process. > > On Mon, Jul 6, 2015 at 12:02 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> Hey Yi/Martin, >> >> With respect to state reuse: Yeah that is a better explanation of what I >> was trying to say. That prototype actually includes state reuse (each task >> checkpoints it's offset for the various partitions during the commit and >> uses that to determine if the state is valid and can be reused on >> restart). >> So it is just a matter of a framework being able to express a preference >> for the recently used hosts--those that can will get the optimization and >> those that can't won't. >> >> Yi, for the three layers I think we agree, but I may be unclear on what >> you >> are actually proposing. I think the question is whether for the "as a >> service" you are proposing actually trying to build some layer over >> YARN/Mesos/AWS that abstracts these away? Of course, there is nothing that >> requires such an abstraction to run stream processing as a service on top >> of (say) YARN. Can you be more specific about what would be in-scope for >> that? >> >> I am generally skeptical of a layer that abstracts away these frameworks >> for two reasons. First I think they are quite different and each is >> advancing fairly rapidly so abstracting over them may be hard. Second I >> think the layer of abstraction doesn't really help the user. If you think >> about it you tend to just adopt one of the frameworks (say Mesos/Marathon) >> and you learn the tooling associated with that and use it for all your >> different systems. Building an abstraction over that doesn't really give >> you any flexibility: you can't swap in another framework because all your >> other stuff is tied to (say) Mesos/Marathon, and the layer of indirection >> obscures the interface the user is already familar with from other >> systems. >> But I think I may actually be misunderstanding your proposal... >> >> -Jay >> >> On Mon, Jul 6, 2015 at 11:30 AM, Yi Pan <nickpa...@gmail.com> wrote: >> >> > Hi, Martin, >> > >> > Great to hear your voice! I will just try to focus on your questions >> > regarding to "w/o YARN" part. >> > >> > {quote} >> > For example, would host affinity (SAMZA-617) still be possible? >> > {quote} >> > It would be possible if we separate the job execution/process launching >> > from the partition assignment among all Samza containers. Host-affinity >> > could follow the model below: >> > a. We still keeps a container-host mapping as in SAMZA-617 >> > b. It would be a best-effort approach try to get the same set of host >> from >> > the job execution framework (e.g. as we did w/ YARN in SamzaAppMaster >> code. >> > If Slider/Marathon comes up w/ this feature, we can migrate to that as >> > well). >> > c. When the container starts, it discovers the local state and try to >> > retain the same partitions via the partition management APIs (i.e. >> > explicitly ask for a specific set of partitions to be assigned to the >> > container, instead of default to whatever broker decides to assign to >> the >> > container) >> > >> > {quote} >> > a question about the proposed API for instantiating a job in code >> (rather >> > than a properties file): when submitting a job to a cluster, is the idea >> > that the instantiation code runs on a client somewhere, which then pokes >> > the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code run on >> > each container that is part of the job (in which case, how does the job >> > submission to the cluster work)? >> > {quote} >> > I want to chime in here to propose the three layered model: >> > 1. Samza as a library. In this case, the client application should >> actually >> > implement the interface to submit its own job to a cluster, if the >> client >> > application chooses to run in a cluster. Samza as a library won't be >> poking >> > any endpoints on YARN/Mesos/AWS >> > 2. Samza as a process. In this case, a properties file could be desired >> > when starting the Samza process. Here again, Samza as a process should >> not >> > need to interact w/ any endpoints on YARN/Mesos/AWS. >> > 3. Samza as a service. In this case, we definitely need some job >> > configuration and a Samza implementation of interface to submit jobs and >> > their configuration to a cluster (i.e. YARN/Mesos/AWS, mostly the same >> as >> > we have today, except that we should just ask for resources to run Samza >> > containers and leave the partition management aside) >> > >> > Thanks! >> > >> > On Mon, Jul 6, 2015 at 3:37 AM, Martin Kleppmann <mar...@kleppmann.com> >> > wrote: >> > >> > > Hi all, >> > > >> > > Lots of good thoughts here. >> > > >> > > I agree with the general philosophy of tying Samza more firmly to >> Kafka. >> > > After I spent a while looking at integrating other message brokers >> (e.g. >> > > Kinesis) with SystemConsumer, I came to the conclusion that >> > SystemConsumer >> > > tacitly assumes a model so much like Kafka's that pretty much nobody >> but >> > > Kafka actually implements it. (Databus is perhaps an exception, but it >> > > isn't widely used outside of LinkedIn.) Thus, making Samza fully >> > dependent >> > > on Kafka acknowledges that the system-independence was never as real >> as >> > we >> > > perhaps made it out to be. The gains of code reuse are real. >> > > >> > > The idea of decoupling Samza from YARN has also always been appealing >> to >> > > me, for various reasons already mentioned in this thread. Although >> making >> > > Samza jobs deployable on anything (YARN/Mesos/AWS/etc) seems >> laudable, I >> > am >> > > a little concerned that it will restrict us to a lowest common >> > denominator. >> > > For example, would host affinity (SAMZA-617) still be possible? For >> jobs >> > > with large amounts of state, I think SAMZA-617 would be a big boon, >> since >> > > restoring state off the changelog on every single restart is painful, >> due >> > > to long recovery times. It would be a shame if the decoupling from >> YARN >> > > made host affinity impossible. >> > > >> > > Jay, a question about the proposed API for instantiating a job in code >> > > (rather than a properties file): when submitting a job to a cluster, >> is >> > the >> > > idea that the instantiation code runs on a client somewhere, which >> then >> > > pokes the necessary endpoints on YARN/Mesos/AWS/etc? Or does that code >> > run >> > > on each container that is part of the job (in which case, how does the >> > job >> > > submission to the cluster work)? >> > > >> > > I agree with Garry that it doesn't feel right to make a 1.0 release >> with >> > a >> > > plan for it to be immediately obsolete. So if this is going to >> happen, I >> > > think it would be more honest to stick with 0.* version numbers until >> the >> > > library-ified Samza has been implemented, is stable and widely used. >> > > >> > > Should the new Samza be a subproject of Kafka? There is precedent for >> > > tight coupling between different Apache projects (e.g. Curator and >> > > Zookeeper, or Slider and YARN), so I think remaining separate would be >> > ok. >> > > Even if Samza is fully dependent on Kafka, there is enough substance >> in >> > > Samza that it warrants being a separate project. An argument in >> favour of >> > > merging would be if we think Kafka has a much stronger "brand >> presence" >> > > than Samza; I'm ambivalent on that one. If the Kafka project is >> willing >> > to >> > > endorse Samza as the "official" way of doing stateful stream >> > > transformations, that would probably have much the same effect as >> > > re-branding Samza as "Kafka Stream Processors" or suchlike. Close >> > > collaboration between the two projects will be needed in any case. >> > > >> > > From a project management perspective, I guess the "new Samza" would >> have >> > > to be developed on a branch alongside ongoing maintenance of the >> current >> > > line of development? I think it would be important to continue >> supporting >> > > existing users, and provide a graceful migration path to the new >> version. >> > > Leaving the current versions unsupported and forcing people to rewrite >> > > their jobs would send a bad signal. >> > > >> > > Best, >> > > Martin >> > > >> > > On 2 Jul 2015, at 16:59, Jay Kreps <j...@confluent.io> wrote: >> > > >> > > > Hey Garry, >> > > > >> > > > Yeah that's super frustrating. I'd be happy to chat more about this >> if >> > > > you'd be interested. I think Chris and I started with the idea of >> "what >> > > > would it take to make Samza a kick-ass ingestion tool" but >> ultimately >> > we >> > > > kind of came around to the idea that ingestion and transformation >> had >> > > > pretty different needs and coupling the two made things hard. >> > > > >> > > > For what it's worth I think copycat (KIP-26) actually will do what >> you >> > > are >> > > > looking for. >> > > > >> > > > With regard to your point about slider, I don't necessarily >> disagree. >> > > But I >> > > > think getting good YARN support is quite doable and I think we can >> make >> > > > that work well. I think the issue this proposal solves is that >> > > technically >> > > > it is pretty hard to support multiple cluster management systems the >> > way >> > > > things are now, you need to write an "app master" or "framework" for >> > each >> > > > and they are all a little different so testing is really hard. In >> the >> > > > absence of this we have been stuck with just YARN which has >> fantastic >> > > > penetration in the Hadoopy part of the org, but zero penetration >> > > elsewhere. >> > > > Given the huge amount of work being put in to slider, marathon, aws >> > > > tooling, not to mention the umpteen related packaging technologies >> > people >> > > > want to use (Docker, Kubernetes, various cloud-specific deploy >> tools, >> > > etc) >> > > > I really think it is important to get this right. >> > > > >> > > > -Jay >> > > > >> > > > On Thu, Jul 2, 2015 at 4:17 AM, Garry Turkington < >> > > > g.turking...@improvedigital.com> wrote: >> > > > >> > > >> Hi all, >> > > >> >> > > >> I think the question below re does Samza become a sub-project of >> Kafka >> > > >> highlights the broader point around migration. Chris mentions >> Samza's >> > > >> maturity is heading towards a v1 release but I'm not sure it feels >> > > right to >> > > >> launch a v1 then immediately plan to deprecate most of it. >> > > >> >> > > >> From a selfish perspective I have some guys who have started >> working >> > > with >> > > >> Samza and building some new consumers/producers was next up. Sounds >> > like >> > > >> that is absolutely not the direction to go. I need to look into the >> > KIP >> > > in >> > > >> more detail but for me the attractiveness of adding new Samza >> > > >> consumer/producers -- even if yes all they were doing was really >> > getting >> > > >> data into and out of Kafka -- was to avoid having to worry about >> the >> > > >> lifecycle management of external clients. If there is a generic >> Kafka >> > > >> ingress/egress layer that I can plug a new connector into and have >> a >> > > lot of >> > > >> the heavy lifting re scale and reliability done for me then it >> gives >> > me >> > > all >> > > >> the pushing new consumers/producers would. If not then it >> complicates >> > my >> > > >> operational deployments. >> > > >> >> > > >> Which is similar to my other question with the proposal -- if we >> > build a >> > > >> fully available/stand-alone Samza plus the requisite shims to >> > integrate >> > > >> with Slider etc I suspect the former may be a lot more work than we >> > > think. >> > > >> We may make it much easier for a newcomer to get something running >> but >> > > >> having them step up and get a reliable production deployment may >> still >> > > >> dominate mailing list traffic, if for different reasons than >> today. >> > > >> >> > > >> Don't get me wrong -- I'm comfortable with making the Samza >> dependency >> > > on >> > > >> Kafka much more explicit and I absolutely see the benefits in the >> > > >> reduction of duplication and clashing terminologies/abstractions >> that >> > > >> Chris/Jay describe. Samza as a library would likely be a very nice >> > tool >> > > to >> > > >> add to the Kafka ecosystem. I just have the concerns above re the >> > > >> operational side. >> > > >> >> > > >> Garry >> > > >> >> > > >> -----Original Message----- >> > > >> From: Gianmarco De Francisci Morales [mailto:g...@apache.org] >> > > >> Sent: 02 July 2015 12:56 >> > > >> To: dev@samza.apache.org >> > > >> Subject: Re: Thoughts and obesrvations on Samza >> > > >> >> > > >> Very interesting thoughts. >> > > >> From outside, I have always perceived Samza as a computing layer >> over >> > > >> Kafka. >> > > >> >> > > >> The question, maybe a bit provocative, is "should Samza be a >> > sub-project >> > > >> of Kafka then?" >> > > >> Or does it make sense to keep it as a separate project with a >> separate >> > > >> governance? >> > > >> >> > > >> Cheers, >> > > >> >> > > >> -- >> > > >> Gianmarco >> > > >> >> > > >> On 2 July 2015 at 08:59, Yan Fang <yanfang...@gmail.com> wrote: >> > > >> >> > > >>> Overall, I agree to couple with Kafka more tightly. Because Samza >> de >> > > >>> facto is based on Kafka, and it should leverage what Kafka has. At >> > the >> > > >>> same time, Kafka does not need to reinvent what Samza already >> has. I >> > > >>> also like the idea of separating the ingestion and transformation. >> > > >>> >> > > >>> But it is a little difficult for me to image how the Samza will >> look >> > > >> like. >> > > >>> And I feel Chris and Jay have a little difference in terms of how >> > > >>> Samza should look like. >> > > >>> >> > > >>> *** Will it look like what Jay's code shows (A client of Kakfa) ? >> And >> > > >>> user's application code calls this client? >> > > >>> >> > > >>> 1. If we make Samza be a library of Kafka (like what the code >> shows), >> > > >>> how do we implement auto-balance and fault-tolerance? Are they >> taken >> > > >>> care by the Kafka broker or other mechanism, such as "Samza >> worker" >> > > >>> (just make up the name) ? >> > > >>> >> > > >>> 2. What about other features, such as auto-scaling, shared state, >> > > >>> monitoring? >> > > >>> >> > > >>> >> > > >>> *** If we have Samza standalone, (is this what Chris suggests?) >> > > >>> >> > > >>> 1. we still need to ingest data from Kakfa and produce to it. >> Then it >> > > >>> becomes the same as what Samza looks like now, except it does not >> > rely >> > > >>> on Yarn anymore. >> > > >>> >> > > >>> 2. if it is standalone, how can it leverage Kafka's metrics, logs, >> > > >>> etc? Use Kafka code as the dependency? >> > > >>> >> > > >>> >> > > >>> Thanks, >> > > >>> >> > > >>> Fang, Yan >> > > >>> yanfang...@gmail.com >> > > >>> >> > > >>> On Wed, Jul 1, 2015 at 5:46 PM, Guozhang Wang <wangg...@gmail.com >> > >> > > >> wrote: >> > > >>> >> > > >>>> Read through the code example and it looks good to me. A few >> > > >>>> thoughts regarding deployment: >> > > >>>> >> > > >>>> Today Samza deploys as executable runnable like: >> > > >>>> >> > > >>>> deploy/samza/bin/run-job.sh --config-factory=... >> > > >> --config-path=file://... >> > > >>>> >> > > >>>> And this proposal advocate for deploying Samza more as embedded >> > > >>>> libraries in user application code (ignoring the terminology >> since >> > > >>>> it is not the >> > > >>> same >> > > >>>> as the prototype code): >> > > >>>> >> > > >>>> StreamTask task = new MyStreamTask(configs); Thread thread = new >> > > >>>> Thread(task); thread.start(); >> > > >>>> >> > > >>>> I think both of these deployment modes are important for >> different >> > > >>>> types >> > > >>> of >> > > >>>> users. That said, I think making Samza purely standalone is still >> > > >>>> sufficient for either runnable or library modes. >> > > >>>> >> > > >>>> Guozhang >> > > >>>> >> > > >>>> On Tue, Jun 30, 2015 at 11:33 PM, Jay Kreps <j...@confluent.io> >> > wrote: >> > > >>>> >> > > >>>>> Looks like gmail mangled the code example, it was supposed to >> look >> > > >>>>> like >> > > >>>>> this: >> > > >>>>> >> > > >>>>> Properties props = new Properties(); >> > > >>>>> props.put("bootstrap.servers", "localhost:4242"); >> StreamingConfig >> > > >>>>> config = new StreamingConfig(props); >> > > >>>>> config.subscribe("test-topic-1", "test-topic-2"); >> > > >>>>> config.processor(ExampleStreamProcessor.class); >> > > >>>>> config.serialization(new StringSerializer(), new >> > > >>>>> StringDeserializer()); KafkaStreaming container = new >> > > >>>>> KafkaStreaming(config); container.run(); >> > > >>>>> >> > > >>>>> -Jay >> > > >>>>> >> > > >>>>> On Tue, Jun 30, 2015 at 11:32 PM, Jay Kreps <j...@confluent.io> >> > > >> wrote: >> > > >>>>> >> > > >>>>>> Hey guys, >> > > >>>>>> >> > > >>>>>> This came out of some conversations Chris and I were having >> > > >>>>>> around >> > > >>>>> whether >> > > >>>>>> it would make sense to use Samza as a kind of data ingestion >> > > >>> framework >> > > >>>>> for >> > > >>>>>> Kafka (which ultimately lead to KIP-26 "copycat"). This kind of >> > > >>>> combined >> > > >>>>>> with complaints around config and YARN and the discussion >> around >> > > >>>>>> how >> > > >>> to >> > > >>>>>> best do a standalone mode. >> > > >>>>>> >> > > >>>>>> So the thought experiment was, given that Samza was basically >> > > >>>>>> already totally Kafka specific, what if you just embraced that >> > > >>>>>> and turned it >> > > >>>> into >> > > >>>>>> something less like a heavyweight framework and more like a >> > > >>>>>> third >> > > >>> Kafka >> > > >>>>>> client--a kind of "producing consumer" with state management >> > > >>>> facilities. >> > > >>>>>> Basically a library. Instead of a complex stream processing >> > > >>>>>> framework >> > > >>>>> this >> > > >>>>>> would actually be a very simple thing, not much more >> complicated >> > > >>>>>> to >> > > >>> use >> > > >>>>> or >> > > >>>>>> operate than a Kafka consumer. As Chris said we thought about >> it >> > > >>>>>> a >> > > >>> lot >> > > >>>> of >> > > >>>>>> what Samza (and the other stream processing systems were doing) >> > > >>> seemed >> > > >>>>> like >> > > >>>>>> kind of a hangover from MapReduce. >> > > >>>>>> >> > > >>>>>> Of course you need to ingest/output data to and from the stream >> > > >>>>>> processing. But when we actually looked into how that would >> > > >>>>>> work, >> > > >>> Samza >> > > >>>>>> isn't really an ideal data ingestion framework for a bunch of >> > > >>> reasons. >> > > >>>> To >> > > >>>>>> really do that right you need a pretty different internal data >> > > >>>>>> model >> > > >>>> and >> > > >>>>>> set of apis. So what if you split them and had an api for Kafka >> > > >>>>>> ingress/egress (copycat AKA KIP-26) and a separate api for >> Kafka >> > > >>>>>> transformation (Samza). >> > > >>>>>> >> > > >>>>>> This would also allow really embracing the same terminology and >> > > >>>>>> conventions. One complaint about the current state is that the >> > > >>>>>> two >> > > >>>>> systems >> > > >>>>>> kind of feel bolted on. Terminology like "stream" vs "topic" >> and >> > > >>>>> different >> > > >>>>>> config and monitoring systems means you kind of have to learn >> > > >>>>>> Kafka's >> > > >>>>> way, >> > > >>>>>> then learn Samza's slightly different way, then kind of >> > > >>>>>> understand >> > > >>> how >> > > >>>>> they >> > > >>>>>> map to each other, which having walked a few people through >> this >> > > >>>>>> is surprisingly tricky for folks to get. >> > > >>>>>> >> > > >>>>>> Since I have been spending a lot of time on airplanes I hacked >> > > >>>>>> up an ernest but still somewhat incomplete prototype of what >> > > >>>>>> this would >> > > >>> look >> > > >>>>>> like. This is just unceremoniously dumped into Kafka as it >> > > >>>>>> required a >> > > >>>> few >> > > >>>>>> changes to the new consumer. Here is the code: >> > > >>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>> >> > > >>> >> > https://github.com/jkreps/kafka/tree/streams/clients/src/main/java/org >> > > >>> /apache/kafka/clients/streaming >> > > >>>>>> >> > > >>>>>> For the purpose of the prototype I just liberally renamed >> > > >>>>>> everything >> > > >>> to >> > > >>>>>> try to align it with Kafka with no regard for compatibility. >> > > >>>>>> >> > > >>>>>> To use this would be something like this: >> > > >>>>>> Properties props = new Properties(); >> > > >>>>>> props.put("bootstrap.servers", "localhost:4242"); >> > > >>>>>> StreamingConfig config = new >> > > >>> StreamingConfig(props); >> > > >>>>> config.subscribe("test-topic-1", >> > > >>>>>> "test-topic-2"); >> config.processor(ExampleStreamProcessor.class); >> > > >>>>> config.serialization(new >> > > >>>>>> StringSerializer(), new StringDeserializer()); KafkaStreaming >> > > >>>> container = >> > > >>>>>> new KafkaStreaming(config); container.run(); >> > > >>>>>> >> > > >>>>>> KafkaStreaming is basically the SamzaContainer; StreamProcessor >> > > >>>>>> is basically StreamTask. >> > > >>>>>> >> > > >>>>>> So rather than putting all the class names in a file and then >> > > >>>>>> having >> > > >>>> the >> > > >>>>>> job assembled by reflection, you just instantiate the container >> > > >>>>>> programmatically. Work is balanced over however many instances >> > > >>>>>> of >> > > >>> this >> > > >>>>> are >> > > >>>>>> alive at any time (i.e. if an instance dies, new tasks are >> added >> > > >>>>>> to >> > > >>> the >> > > >>>>>> existing containers without shutting them down). >> > > >>>>>> >> > > >>>>>> We would provide some glue for running this stuff in YARN via >> > > >>>>>> Slider, Mesos via Marathon, and AWS using some of their tools >> > > >>>>>> but from the >> > > >>>> point >> > > >>>>> of >> > > >>>>>> view of these frameworks these stream processing jobs are just >> > > >>>> stateless >> > > >>>>>> services that can come and go and expand and contract at will. >> > > >>>>>> There >> > > >>> is >> > > >>>>> no >> > > >>>>>> more custom scheduler. >> > > >>>>>> >> > > >>>>>> Here are some relevant details: >> > > >>>>>> >> > > >>>>>> 1. It is only ~1300 lines of code, it would get larger if we >> > > >>>>>> productionized but not vastly larger. We really do get a ton >> > > >>>>>> of >> > > >>>>> leverage >> > > >>>>>> out of Kafka. >> > > >>>>>> 2. Partition management is fully delegated to the new >> consumer. >> > > >>> This >> > > >>>>>> is nice since now any partition management strategy available >> > > >>>>>> to >> > > >>>> Kafka >> > > >>>>>> consumer is also available to Samza (and vice versa) and with >> > > >>>>>> the >> > > >>>>> exact >> > > >>>>>> same configs. >> > > >>>>>> 3. It supports state as well as state reuse >> > > >>>>>> >> > > >>>>>> Anyhow take a look, hopefully it is thought provoking. >> > > >>>>>> >> > > >>>>>> -Jay >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> On Tue, Jun 30, 2015 at 6:55 PM, Chris Riccomini < >> > > >>>> criccom...@apache.org> >> > > >>>>>> wrote: >> > > >>>>>> >> > > >>>>>>> Hey all, >> > > >>>>>>> >> > > >>>>>>> I have had some discussions with Samza engineers at LinkedIn >> > > >>>>>>> and >> > > >>>>> Confluent >> > > >>>>>>> and we came up with a few observations and would like to >> > > >>>>>>> propose >> > > >>> some >> > > >>>>>>> changes. >> > > >>>>>>> >> > > >>>>>>> We've observed some things that I want to call out about >> > > >>>>>>> Samza's >> > > >>>> design, >> > > >>>>>>> and I'd like to propose some changes. >> > > >>>>>>> >> > > >>>>>>> * Samza is dependent upon a dynamic deployment system. >> > > >>>>>>> * Samza is too pluggable. >> > > >>>>>>> * Samza's SystemConsumer/SystemProducer and Kafka's consumer >> > > >>>>>>> APIs >> > > >>> are >> > > >>>>>>> trying to solve a lot of the same problems. >> > > >>>>>>> >> > > >>>>>>> All three of these issues are related, but I'll address them >> in >> > > >>> order. >> > > >>>>>>> >> > > >>>>>>> Deployment >> > > >>>>>>> >> > > >>>>>>> Samza strongly depends on the use of a dynamic deployment >> > > >>>>>>> scheduler >> > > >>>> such >> > > >>>>>>> as >> > > >>>>>>> YARN, Mesos, etc. When we initially built Samza, we bet that >> > > >>>>>>> there >> > > >>>> would >> > > >>>>>>> be >> > > >>>>>>> one or two winners in this area, and we could support them, >> and >> > > >>>>>>> the >> > > >>>> rest >> > > >>>>>>> would go away. In reality, there are many variations. >> > > >>>>>>> Furthermore, >> > > >>>> many >> > > >>>>>>> people still prefer to just start their processors like normal >> > > >>>>>>> Java processes, and use traditional deployment scripts such as >> > > >>>>>>> Fabric, >> > > >>>> Chef, >> > > >>>>>>> Ansible, etc. Forcing a deployment system on users makes the >> > > >>>>>>> Samza start-up process really painful for first time users. >> > > >>>>>>> >> > > >>>>>>> Dynamic deployment as a requirement was also a bit of a >> > > >>>>>>> mis-fire >> > > >>>> because >> > > >>>>>>> of >> > > >>>>>>> a fundamental misunderstanding between the nature of batch >> jobs >> > > >>>>>>> and >> > > >>>>> stream >> > > >>>>>>> processing jobs. Early on, we made conscious effort to favor >> > > >>>>>>> the >> > > >>>> Hadoop >> > > >>>>>>> (Map/Reduce) way of doing things, since it worked and was well >> > > >>>>> understood. >> > > >>>>>>> One thing that we missed was that batch jobs have a definite >> > > >>>> beginning, >> > > >>>>>>> and >> > > >>>>>>> end, and stream processing jobs don't (usually). This leads to >> > > >>>>>>> a >> > > >>> much >> > > >>>>>>> simpler scheduling problem for stream processors. You >> basically >> > > >>>>>>> just >> > > >>>>> need >> > > >>>>>>> to find a place to start the processor, and start it. The way >> > > >>>>>>> we run grids, at LinkedIn, there's no concept of a cluster >> > > >>>>>>> being "full". We always >> > > >>>> add >> > > >>>>>>> more machines. The problem with coupling Samza with a >> scheduler >> > > >>>>>>> is >> > > >>>> that >> > > >>>>>>> Samza (as a framework) now has to handle deployment. This >> pulls >> > > >>>>>>> in a >> > > >>>>> bunch >> > > >>>>>>> of things such as configuration distribution (config stream), >> > > >>>>>>> shell >> > > >>>>> scrips >> > > >>>>>>> (bin/run-job.sh, JobRunner), packaging (all the .tgz stuff), >> etc. >> > > >>>>>>> >> > > >>>>>>> Another reason for requiring dynamic deployment was to support >> > > >>>>>>> data locality. If you want to have locality, you need to put >> > > >>>>>>> your >> > > >>>> processors >> > > >>>>>>> close to the data they're processing. Upon further >> > > >>>>>>> investigation, >> > > >>>>> though, >> > > >>>>>>> this feature is not that beneficial. There is some good >> > > >>>>>>> discussion >> > > >>>> about >> > > >>>>>>> some problems with it on SAMZA-335. Again, we took the >> > > >>>>>>> Map/Reduce >> > > >>>> path, >> > > >>>>>>> but >> > > >>>>>>> there are some fundamental differences between HDFS and Kafka. >> > > >>>>>>> HDFS >> > > >>>> has >> > > >>>>>>> blocks, while Kafka has partitions. This leads to less >> > > >>>>>>> optimization potential with stream processors on top of Kafka. >> > > >>>>>>> >> > > >>>>>>> This feature is also used as a crutch. Samza doesn't have any >> > > >>>>>>> built >> > > >>> in >> > > >>>>>>> fault-tolerance logic. Instead, it depends on the dynamic >> > > >>>>>>> deployment scheduling system to handle restarts when a >> > > >>>>>>> processor dies. This has >> > > >>>>> made >> > > >>>>>>> it very difficult to write a standalone Samza container >> > > >> (SAMZA-516). >> > > >>>>>>> >> > > >>>>>>> Pluggability >> > > >>>>>>> >> > > >>>>>>> In some cases pluggability is good, but I think that we've >> gone >> > > >>>>>>> too >> > > >>>> far >> > > >>>>>>> with it. Currently, Samza has: >> > > >>>>>>> >> > > >>>>>>> * Pluggable config. >> > > >>>>>>> * Pluggable metrics. >> > > >>>>>>> * Pluggable deployment systems. >> > > >>>>>>> * Pluggable streaming systems (SystemConsumer, SystemProducer, >> > > >> etc). >> > > >>>>>>> * Pluggable serdes. >> > > >>>>>>> * Pluggable storage engines. >> > > >>>>>>> * Pluggable strategies for just about every component >> > > >>> (MessageChooser, >> > > >>>>>>> SystemStreamPartitionGrouper, ConfigRewriter, etc). >> > > >>>>>>> >> > > >>>>>>> There's probably more that I've forgotten, as well. Some of >> > > >>>>>>> these >> > > >>> are >> > > >>>>>>> useful, but some have proven not to be. This all comes at a >> cost: >> > > >>>>>>> complexity. This complexity is making it harder for our users >> > > >>>>>>> to >> > > >>> pick >> > > >>>> up >> > > >>>>>>> and use Samza out of the box. It also makes it difficult for >> > > >>>>>>> Samza developers to reason about what the characteristics of >> > > >>>>>>> the container (since the characteristics change depending on >> > > >>>>>>> which plugins are use). >> > > >>>>>>> >> > > >>>>>>> The issues with pluggability are most visible in the System >> APIs. >> > > >>> What >> > > >>>>>>> Samza really requires to be functional is Kafka as its >> > > >>>>>>> transport >> > > >>>> layer. >> > > >>>>>>> But >> > > >>>>>>> we've conflated two unrelated use cases into one API: >> > > >>>>>>> >> > > >>>>>>> 1. Get data into/out of Kafka. >> > > >>>>>>> 2. Process the data in Kafka. >> > > >>>>>>> >> > > >>>>>>> The current System API supports both of these use cases. The >> > > >>>>>>> problem >> > > >>>> is, >> > > >>>>>>> we >> > > >>>>>>> actually want different features for each use case. By >> papering >> > > >>>>>>> over >> > > >>>>> these >> > > >>>>>>> two use cases, and providing a single API, we've introduced a >> > > >>>>>>> ton of >> > > >>>>> leaky >> > > >>>>>>> abstractions. >> > > >>>>>>> >> > > >>>>>>> For example, what we'd really like in (2) is to have >> > > >>>>>>> monotonically increasing longs for offsets (like Kafka). This >> > > >>>>>>> would be at odds >> > > >>> with >> > > >>>>> (1), >> > > >>>>>>> though, since different systems have different >> > > >>>>> SCNs/Offsets/UUIDs/vectors. >> > > >>>>>>> There was discussion both on the mailing list and the SQL >> JIRAs >> > > >>> about >> > > >>>>> the >> > > >>>>>>> need for this. >> > > >>>>>>> >> > > >>>>>>> The same thing holds true for replayability. Kafka allows us >> to >> > > >>> rewind >> > > >>>>>>> when >> > > >>>>>>> we have a failure. Many other systems don't. In some cases, >> > > >>>>>>> systems >> > > >>>>> return >> > > >>>>>>> null for their offsets (e.g. WikipediaSystemConsumer) because >> > > >>>>>>> they >> > > >>>> have >> > > >>>>> no >> > > >>>>>>> offsets. >> > > >>>>>>> >> > > >>>>>>> Partitioning is another example. Kafka supports partitioning, >> > > >>>>>>> but >> > > >>> many >> > > >>>>>>> systems don't. We model this by having a single partition for >> > > >>>>>>> those systems. Still, other systems model partitioning >> > > >> differently (e.g. >> > > >>>>>>> Kinesis). >> > > >>>>>>> >> > > >>>>>>> The SystemAdmin interface is also a mess. Creating streams in >> a >> > > >>>>>>> system-agnostic way is almost impossible. As is modeling >> > > >>>>>>> metadata >> > > >>> for >> > > >>>>> the >> > > >>>>>>> system (replication factor, partitions, location, etc). The >> > > >>>>>>> list >> > > >>> goes >> > > >>>>> on. >> > > >>>>>>> >> > > >>>>>>> Duplicate work >> > > >>>>>>> >> > > >>>>>>> At the time that we began writing Samza, Kafka's consumer and >> > > >>> producer >> > > >>>>>>> APIs >> > > >>>>>>> had a relatively weak feature set. On the consumer-side, you >> > > >>>>>>> had two >> > > >>>>>>> options: use the high level consumer, or the simple consumer. >> > > >>>>>>> The >> > > >>>>> problem >> > > >>>>>>> with the high-level consumer was that it controlled your >> > > >>>>>>> offsets, partition assignments, and the order in which you >> > > >>>>>>> received messages. The >> > > >>> problem >> > > >>>>>>> with >> > > >>>>>>> the simple consumer is that it's not simple. It's basic. You >> > > >>>>>>> end up >> > > >>>>> having >> > > >>>>>>> to handle a lot of really low-level stuff that you shouldn't. >> > > >>>>>>> We >> > > >>>> spent a >> > > >>>>>>> lot of time to make Samza's KafkaSystemConsumer very robust. >> It >> > > >>>>>>> also allows us to support some cool features: >> > > >>>>>>> >> > > >>>>>>> * Per-partition message ordering and prioritization. >> > > >>>>>>> * Tight control over partition assignment to support joins, >> > > >>>>>>> global >> > > >>>> state >> > > >>>>>>> (if we want to implement it :)), etc. >> > > >>>>>>> * Tight control over offset checkpointing. >> > > >>>>>>> >> > > >>>>>>> What we didn't realize at the time is that these features >> > > >>>>>>> should >> > > >>>>> actually >> > > >>>>>>> be in Kafka. A lot of Kafka consumers (not just Samza stream >> > > >>>> processors) >> > > >>>>>>> end up wanting to do things like joins and partition >> > > >>>>>>> assignment. The >> > > >>>>> Kafka >> > > >>>>>>> community has come to the same conclusion. They're adding a >> ton >> > > >>>>>>> of upgrades into their new Kafka consumer implementation. To a >> > > >>>>>>> large extent, >> > > >>> it's >> > > >>>>>>> duplicate work to what we've already done in Samza. >> > > >>>>>>> >> > > >>>>>>> On top of this, Kafka ended up taking a very similar approach >> > > >>>>>>> to >> > > >>>> Samza's >> > > >>>>>>> KafkaCheckpointManager implementation for handling offset >> > > >>>> checkpointing. >> > > >>>>>>> Like Samza, Kafka's new offset management feature stores >> offset >> > > >>>>>>> checkpoints in a topic, and allows you to fetch them from the >> > > >>>>>>> broker. >> > > >>>>>>> >> > > >>>>>>> A lot of this seems like a waste, since we could have shared >> > > >>>>>>> the >> > > >>> work >> > > >>>> if >> > > >>>>>>> it >> > > >>>>>>> had been done in Kafka from the get-go. >> > > >>>>>>> >> > > >>>>>>> Vision >> > > >>>>>>> >> > > >>>>>>> All of this leads me to a rather radical proposal. Samza is >> > > >>> relatively >> > > >>>>>>> stable at this point. I'd venture to say that we're near a 1.0 >> > > >>>> release. >> > > >>>>>>> I'd >> > > >>>>>>> like to propose that we take what we've learned, and begin >> > > >>>>>>> thinking >> > > >>>>> about >> > > >>>>>>> Samza beyond 1.0. What would we change if we were starting >> from >> > > >>>> scratch? >> > > >>>>>>> My >> > > >>>>>>> proposal is to: >> > > >>>>>>> >> > > >>>>>>> 1. Make Samza standalone the *only* way to run Samza >> > > >>>>>>> processors, and eliminate all direct dependences on YARN, >> Mesos, >> > > >> etc. >> > > >>>>>>> 2. Make a definitive call to support only Kafka as the stream >> > > >>>> processing >> > > >>>>>>> layer. >> > > >>>>>>> 3. Eliminate Samza's metrics, logging, serialization, and >> > > >>>>>>> config >> > > >>>>> systems, >> > > >>>>>>> and simply use Kafka's instead. >> > > >>>>>>> >> > > >>>>>>> This would fix all of the issues that I outlined above. It >> > > >>>>>>> should >> > > >>> also >> > > >>>>>>> shrink the Samza code base pretty dramatically. Supporting >> only >> > > >>>>>>> a standalone container will allow Samza to be executed on YARN >> > > >>>>>>> (using Slider), Mesos (using Marathon/Aurora), or most other >> > > >>>>>>> in-house >> > > >>>>> deployment >> > > >>>>>>> systems. This should make life a lot easier for new users. >> > > >>>>>>> Imagine >> > > >>>>> having >> > > >>>>>>> the hello-samza tutorial without YARN. The drop in mailing >> list >> > > >>>> traffic >> > > >>>>>>> will be pretty dramatic. >> > > >>>>>>> >> > > >>>>>>> Coupling with Kafka seems long overdue to me. The reality is, >> > > >>> everyone >> > > >>>>>>> that >> > > >>>>>>> I'm aware of is using Samza with Kafka. We basically require >> it >> > > >>>> already >> > > >>>>> in >> > > >>>>>>> order for most features to work. Those that are using other >> > > >>>>>>> systems >> > > >>>> are >> > > >>>>>>> generally using it for ingest into Kafka (1), and then they do >> > > >>>>>>> the processing on top. There is already discussion ( >> > > >>>>>>> >> > > >>>>> >> > > >>>> >> > > >>> >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=58851 >> > > >>> 767 >> > > >>>>>>> ) >> > > >>>>>>> in Kafka to make ingesting into Kafka extremely easy. >> > > >>>>>>> >> > > >>>>>>> Once we make the call to couple with Kafka, we can leverage a >> > > >>>>>>> ton of >> > > >>>>> their >> > > >>>>>>> ecosystem. We no longer have to maintain our own config, >> > > >>>>>>> metrics, >> > > >>> etc. >> > > >>>>> We >> > > >>>>>>> can all share the same libraries, and make them better. This >> > > >>>>>>> will >> > > >>> also >> > > >>>>>>> allow us to share the consumer/producer APIs, and will let us >> > > >>> leverage >> > > >>>>>>> their offset management and partition management, rather than >> > > >>>>>>> having >> > > >>>> our >> > > >>>>>>> own. All of the coordinator stream code would go away, as >> would >> > > >>>>>>> most >> > > >>>> of >> > > >>>>>>> the >> > > >>>>>>> YARN AppMaster code. We'd probably have to push some partition >> > > >>>>> management >> > > >>>>>>> features into the Kafka broker, but they're already moving in >> > > >>>>>>> that direction with the new consumer API. The features we have >> > > >>>>>>> for >> > > >>>> partition >> > > >>>>>>> assignment aren't unique to Samza, and seem like they should >> be >> > > >>>>>>> in >> > > >>>> Kafka >> > > >>>>>>> anyway. There will always be some niche usages which will >> > > >>>>>>> require >> > > >>>> extra >> > > >>>>>>> care and hence full control over partition assignments much >> > > >>>>>>> like the >> > > >>>>> Kafka >> > > >>>>>>> low level consumer api. These would continue to be supported. >> > > >>>>>>> >> > > >>>>>>> These items will be good for the Samza community. They'll make >> > > >>>>>>> Samza easier to use, and make it easier for developers to add >> > > >>>>>>> new features. >> > > >>>>>>> >> > > >>>>>>> Obviously this is a fairly large (and somewhat backwards >> > > >>> incompatible >> > > >>>>>>> change). If we choose to go this route, it's important that we >> > > >>> openly >> > > >>>>>>> communicate how we're going to provide a migration path from >> > > >>>>>>> the >> > > >>>>> existing >> > > >>>>>>> APIs to the new ones (if we make incompatible changes). I >> think >> > > >>>>>>> at a minimum, we'd probably need to provide a wrapper to allow >> > > >>>>>>> existing StreamTask implementations to continue running on the >> > > >> new container. >> > > >>>>> It's >> > > >>>>>>> also important that we openly communicate about timing, and >> > > >>>>>>> stages >> > > >>> of >> > > >>>>> the >> > > >>>>>>> migration. >> > > >>>>>>> >> > > >>>>>>> If you made it this far, I'm sure you have opinions. :) Please >> > > >>>>>>> send >> > > >>>> your >> > > >>>>>>> thoughts and feedback. >> > > >>>>>>> >> > > >>>>>>> Cheers, >> > > >>>>>>> Chris >> > > >>>>>>> >> > > >>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>> >> > > >>>> >> > > >>>> >> > > >>>> -- >> > > >>>> -- Guozhang >> > > >>>> >> > > >>> >> > > >> >> > > >> > > >> > >> > >