I realized that I'm not clear on the terminology. Is a job a packaged collection of tasks? If so, then the config called "job.name" is a little confusing and perhaps should be task.name?
On Tue, Sep 9, 2014 at 3:43 PM, Roger Hoover <[email protected]> wrote: > Hi Chris, > > I think the config is not too bad as it. Hey, it's not XML!!! > > Maybe there are ways to make it better. These are some things that come > to mind for me but I haven't really though through: > > - What about a way to specify a DAG for the job? From the developer's > point of view, she mostly cares of the data flow. Maybe there could a > pluggable naming schema for topics in between jobs so that you don't have > to explicitly name them??? You'd want a nice way to specify this. YAML?? > Using job-name: > > wikipedia-feed > - wikipedia-parser > - wikipedia-stats > > Ideally, that would be enough to wire everything together??? > > - Support a programatic, code-level API for building, validating and > deploying jobs? Hopefully, this would make it possible to build > higher-level frameworks on top that could dynamically generate jobs. I > don't know if I'd ever want to do this but if the API is there, you never > know what will spring up. > - Support for validation during build and during runtime initialization to > catch errors early. > - Can sensible defaults make the config less verbose? > - What about on/off switches for things like metrics and checkpointing? > If don't specify otherwise, you get the default metrics package and Kafka > checkpointing. > > > On Mon, Sep 8, 2014 at 3:36 PM, Chris Riccomini < > [email protected]> wrote: > >> Hey Roger, >> >> This summary sounds pretty reasonable to me, on the Samza side, anyway. I >> can't comment on the others, as I don't have a great understanding of >> their internals. >> >> I agree with you about the knock on Samza's config system. It's something >> I think we'll need to resolve, but we don't have a concrete proposal on >> how, yet. Given that you've had a look at these three systems, do you have >> any feedback on what you'd like to see. The topic-name issue is a >> relatively specific problem, but maybe you have thoughts on a higher-level >> design that you'd like to have? >> >> Right now, we have: >> >> 1. We mix wiring and config in config files. Wiring in configs leads to >> runtime errors. We have no code-level "builder" for assembling a job, and >> no DI framework. >> 2. Everything is a k-v string pair. Nothing is hierarchical. >> 3. Config is exposed to developers via a single Config object, that's >> nothing more than a Map<String, String> with some helper methods. >> >> Do you have any thoughts on what you liked from the other frameworks, and >> what you'd like to see Samza do in this area? >> >> Cheers, >> Chris >> >> On 9/8/14 2:23 PM, "Roger Hoover" <[email protected]> wrote: >> >> >Hi Chris, >> > >> >We've been looking at Samza for a number of use cases ranging from system >> >monitoring to user activity analysis for a pretty large retail website. >> > For system monitoring, for example, we want to push system performance >> >events into Kafka then have Samza jobs enrich these events with >> additional >> >context such as information about the deployment topology and/or the user >> >who triggered it. Then we want to feed the enriched streams into various >> >data stores including Elasticsearch and Hadoop for analysis. >> > >> >I've been diving into Samza, Spark Streaming, and Storm. So far, I'm >> >impressed with the simplicity of Samza. I feel like it's the easiest to >> >reason about and can scale up to larger teams where not everyone is an >> >expert on the framework. I may be wrong about some of my conclusions so >> >I'm interested in anyone else wants to share their experience. >> > >> >Samza >> > - The simplest to reason about from a developer's perspective. Tasks >> >are >> >single threaded and the API is very simple to understand. Local state >> >management API simplifies the developer's job a lot and should give >> >excellent performance. Scalability is well defined (parallelism is >> >determined by number of Kafka partitions + number of tasks). Topologies >> >are explicit. You create tasks for each step and wire them together. >> > - The simplest to operate because each task is independent from the >> >others. Kafka protects against backpressure and provides fine grain >> fault >> >tolerance. Seems unlikely that a poorly written job/task will hurt more >> >than itself. Other than Kafka, there are minimal network dependencies. >> > There are relatively few moving parts. >> > - The framework is pretty lightweight, relying Kafka and YARN to do >> most >> >of the work. Fantastic documentation! >> > - The primary downside that I see is that there is a lot of wiring >> >involved and may be easy to mess it up. For example, topic names need to >> >specified both in the code and the config and need to match across >> >tasks/jobs. You probably won't know that you've screwed up until you >> find >> >a task is not receiving any messages. You also have to change both the >> >code and the config if you change the name of a topic. In larger >> >organizations, these activities might be done by people on different >> >teams. >> > >> >Storm >> >- Trident will do some auto-wiring of topologies but it's hard to reason >> >able the generated topologies, especially transactional behavior. >> >- Managing non-local state can be tricky to get right. >> >- There are lots of moving parts that have to be tuned (LMAX disruptor >> >queues, 0mq queues) >> >- Ships closures around the cluster. Makes it hard to work with things >> >like JRuby. >> >- Framework provides things like cluster management/scheduling that can >> be >> >done elsewhere (YARN or Mesos) >> > >> >Spark Streaming >> >- Quite a few things are implicit. You write jobs as if they were going >> >to >> >execute in a single process. These get broken into stages and scheduled >> >on >> >the cluster. You need a pretty good understanding of this process to >> >write, debug, and tune these jobs. I think it's fairly easy to >> >accidentally include a reference to a large Java object in worker code >> and >> >have it shipped across the network unintentionally. >> >- All the code/objects that run on workers must be serializable. I'm >> >guessing this can be quite annoying. Samza ships the jar to the worker >> so >> >I think it should be able to run anything (JRuby for example). >> >- The driver process is a single point of failure, the only place were >> >some >> >job state is saved. >> >- Seems operationally risky. As far as I can tell, a slow consumer tie >> up >> >the entire cluster. This is because there is no backpressure that stops >> a >> >driver from producing more and more DStreams, even if consumers cannot >> >keep >> >up. The tuning guide suggests that everytime you make a change to a job, >> >you need to tune your settings again ( >> > >> https://spark.apache.org/docs/latest/streaming-programming-guide.html#perf >> >ormance-tuning >> >). >> > >> >Cheers, >> > >> >Roger >> > >> > >> >On Fri, Sep 5, 2014 at 4:14 PM, Chris Riccomini < >> >[email protected]> wrote: >> > >> >> Hey Roger, >> >> >> >> > After thinking more about it, I don't think we can get to the >> >> >deterministic behavior we talked about until Kafka supports idempotent >> >> >producers. >> >> >> >> Yes, I agree. To get fully deterministic behavior, you do need >> >>idempotent >> >> producers + deterministic message ordering (or Kafka transactionality) >> >>at >> >> the Kafka level. We plan on relying on Kafka transactionality when the >> >> patch is committed. Without this, it's possible to output two different >> >> "correct" answers (for my definition of correct), or two of the same >> >> "correct" answers (for your definition of correct). >> >> >> >> > I created a JIRA ticket in case it helps spur action or keep the >> >> >conversation from getting lost. >> >> >> >> Awesome, thanks! >> >> >> >> Out of curiosity, what are you using Samza for? You seem to have quite >> a >> >> deep understanding of it. :) >> >> >> >> Cheers, >> >> Chris >> >> >> >> On 9/4/14 10:06 AM, "Roger Hoover" <[email protected]> wrote: >> >> >> >> >Chris, Chinmay, >> >> > >> >> >After thinking more about it, I don't think we can get to the >> >> >deterministic >> >> >behavior we talked about until Kafka supports idempotent producers. >> >>The >> >> >reason is that duplicate messages mean that we can't rely on strong >> >> >ordering. If a duplicate of update U1 can show up anytime, then we >> can >> >> >never rely on ordered updates because we might see U1 then U2 then U1 >> >> >again. I guess we could try to handle this at the application layer >> >>but >> >> >not at the Samza layer yet, I think. >> >> > >> >> >Nonetheless, some of the changes we discussed may help get closer and >> >> >still >> >> >make sense to implement. I created a JIRA ticket in case it helps >> spur >> >> >action or keep the conversation from getting lost. If you don't find >> >>the >> >> >ticket useful at this time, feel free to close it. >> >> > >> >> >https://issues.apache.org/jira/browse/SAMZA-405 >> >> > >> >> >Cheers, >> >> > >> >> >Roger >> >> > >> >> > >> >> >On Wed, Sep 3, 2014 at 11:26 AM, Chinmay Soman >> >> ><[email protected]> >> >> >wrote: >> >> > >> >> >> > bootstrapping is ever necessary on recovery or rewind. Seems like >> >> >>it's >> >> >> only needed for cold start. >> >> >> >> >> >> I think you're right. Either ways, it looks like there should be >> >> >> additional support for this. >> >> >> ________________________________________ >> >> >> From: Roger Hoover [[email protected]] >> >> >> Sent: Wednesday, September 03, 2014 9:43 AM >> >> >> To: [email protected] >> >> >> Subject: Re: Trying to achieve deterministic behavior on >> >>recovery/rewind >> >> >> >> >> >> Chinmay, >> >> >> >> >> >> Thank you for the feedback. Responses inline. >> >> >> >> >> >> On Tue, Sep 2, 2014 at 2:09 PM, Chinmay Soman >> >> >><[email protected] >> >> >> > >> >> >> wrote: >> >> >> >> >> >> > That's interesting ! >> >> >> > >> >> >> > 1) Deterministic reading from a bootstrap stream: >> >> >> > We could define a changelog for the local state (which in turn is >> >> >> > populated using a bootstrap stream). If the job fails at this >> >>point, >> >> >> > ideally, it should be restored using a changelog stream (instead >> of >> >> >> > bootstrapping again) in order for the job to be deterministic (as >> >>you >> >> >> > suggest). Thus there could be a check which either selects the >> >> >>bootstrap >> >> >> > mode or the changelog restore mode (depending on whether a >> >>changelog >> >> >> > exists). I'm not sure if this check exists today (I would guess >> >>no). >> >> >> > >> >> >> >> >> >> Yes, I was wondering about this for event-table join. If you're >> >>only >> >> >> storing the join table in the local store then the change log stream >> >>and >> >> >> the bootstrap stream are duplicates of each other. One of them is >> >> >> unnecessary unless you add additional state to the local store. In >> >>any >> >> >> case, I'm wondering if bootstrapping is ever necessary on recovery >> or >> >> >> rewind. Seems like it's only needed for cold start. >> >> >> >> >> >> >> >> >> > >> >> >> > 2) Deterministic changelog: >> >> >> > You're right - there could be a (smallish) window where we >> >>re-process >> >> >> some >> >> >> > of the input records on a container restart. This can happen since >> >>the >> >> >> > changelog can be (a little ahead) of the last checkpoint for a >> >>given >> >> >> input >> >> >> > stream. However, I would argue the changelog is still >> >>deterministic in >> >> >> this >> >> >> > case. Since currently Samza only guarantees at-least-once >> >>semantics, >> >> >>this >> >> >> > seems to be OK >> >> >> >> >> >> >> >> >> >> >> >> Hmmm...seems like this will violate the correctness definition that >> >> >>Chris >> >> >> outlined where there may be two or more "correct" choices but the >> >>system >> >> >> would guarantee that only one will be produced. But now that you >> >> >>mention >> >> >> it, I don't know if that can ever been guaranteed with an >> >>at-least-once >> >> >> system. If a task can always see duplicates, it may process the >> >>first >> >> >>with >> >> >> it's local state in state S1 then modify it's state to S2 and then >> >> >>process >> >> >> the duplicate. >> >> >> >> >> >> >> >> >> >> >> >> > >> >> >> > 3) Deterministic MessageChooser: >> >> >> > The in-order problem could be avoided, if we restore the state >> from >> >> >>its >> >> >> > changelog - which was originally populated by a 'bootstrap >> stream'. >> >> >>The >> >> >> > task can then just pick up from where it left off (making the >> >>system >> >> >> > deterministic). Having said that, there might be value in writing >> >>an >> >> >> > 'EarliestFirstChooser'. >> >> >> > >> >> >> >> >> >> Agreed. >> >> >> >> >> >> > >> >> >> > Again, this is just my perception (which could be wrong - I'm >> still >> >> >> > learning). >> >> >> > C >> >> >> > >> >> >> > ________________________________________ >> >> >> > From: Roger Hoover [[email protected]] >> >> >> > Sent: Tuesday, September 02, 2014 8:52 AM >> >> >> > To: [email protected] >> >> >> > Subject: Trying to achieve deterministic behavior on >> >>recovery/rewind >> >> >> > >> >> >> > Hi Samza devs, >> >> >> > >> >> >> > I think this project has the best documentation I've even seen! >> >> >>Amazing >> >> >> > job. It's extremely well written and Hello Samza is a really >> great >> >> >> example >> >> >> > that I was able to run + modify without issue. It was a joy >> >>reading >> >> >>the >> >> >> > docs and playing around with example. Kudos! >> >> >> > >> >> >> > After thoroughly reading all the docs, I still have a few >> questions >> >> >>and >> >> >> > would appreciate any feedback. >> >> >> > >> >> >> > I was thinking about how to support deterministic behavior on >> >> >>recovery or >> >> >> > rewind. Maybe it can't always be 100% deterministic but I think >> we >> >> >>can >> >> >> get >> >> >> > close. Have other people thought about this? Is it desirable? >> >> >> > >> >> >> > For example, let's say we're joining two streams: orders and >> >> >>user_info. >> >> >> As >> >> >> > orders come in, we use the user_id field of the order to lookup >> >> >> additional >> >> >> > information about the user and enrich the stream. Say we're >> >>keeping >> >> >>all >> >> >> > the user_info state in the local KV store. >> >> >> > >> >> >> > t1: User updates her email to "[email protected]" >> >> >> > t2: User buys a pair of jeans (product_id == 99) >> >> >> > t3: User updates her email to "[email protected]" >> >> >> > >> >> >> > In the case of normal operation (no delays in the user_info >> >>stream), >> >> >>the >> >> >> > enriched record will be: >> >> >> > >> >> >> > {product_id: 99, email: "[email protected]", ...} >> >> >> > >> >> >> > But say that our job fails before it can checkpoint and is >> >>configured >> >> >>to >> >> >> > bootstrap from user_info. When it gets restarted and bootstraps >> >>from >> >> >>the >> >> >> > user_info stream, it will end up with the email set to >> >>"[email protected]" >> >> >>in >> >> >> > the local KV store. Then it will reprocess the order event and >> >> >>produce >> >> >> the >> >> >> > "wrong" output: >> >> >> > >> >> >> > {product_id: 99, email: "[email protected]", ...} >> >> >> > >> >> >> > I haven't verified that but the documentation says "a bootstrap >> >>stream >> >> >> > waits for the consumer to explicitly confirm that the stream has >> >>been >> >> >> fully >> >> >> > consumed." Shouldn't it wait until it's consumed up the the >> >> >>checkpoint >> >> >> > offset for the bootsrap stream instead (when there is saved >> >>checkpoint >> >> >> > offset)? >> >> >> > >> >> >> > Likewise, for local state replicated in the change log. During >> the >> >> >> > checkpoint process, Samza could include it's producer offset in >> the >> >> >> > checkpoint data so that during recovery, the local state will be >> >> >>restored >> >> >> > to a state that corresponds with it's offsets for the input >> >>streams. >> >> >> > Everything would be coherent rather than having the input streams >> >> >> restored >> >> >> > to checkpoint and local state restored to most recent value. I'm >> >> >> assuming >> >> >> > that change log commits for local state and checkpoint are done >> >> >>together >> >> >> in >> >> >> > an atomic transaction so that they may not always match. >> >> >> > >> >> >> > The other missing piece is a nearly deterministic MessageChooser. >> >> >>During >> >> >> > recovery + rewind, all the messages in both streams are already >> >> >>present >> >> >> in >> >> >> > Kafka and we want a way to replay them in the same order as if >> they >> >> >>were >> >> >> > played in real-time. The only way to approximate this behavior >> >>that I >> >> >> can >> >> >> > see is to use Kafka broker timestamps for each message. Is it >> >> >>possible >> >> >> to >> >> >> > write an "EarliestFirstChooser" that always chooses the oldest >> >>message >> >> >> > available according to the timestamp it was received by the Kafka >> >> >>broker? >> >> >> > >> >> >> > I don't know if Kafka stores a timestamp with each message but I'm >> >> >> assuming >> >> >> > it does because it supports an API on the simple consumer called >> >> >> > getOffsetsBefore() that would seem to map from timestamps to >> >>offsets. >> >> >> > >> >> >> > Finally, a nit pick. I'm using Samza 0.7.0 but the metrics data >> >>has >> >> >>the >> >> >> > version as {"samza-version": "0.0.1"}. Is this intentional? >> >> >> > >> >> >> > If it makes sense, I can put in some JIRA tickets for this >> stuff... >> >> >> > >> >> >> > Cheers, >> >> >> > >> >> >> > Roger >> >> >> > >> >> >> >> >> >> >> >> >> >
