Chris, Thanks for the quick and thoughtful response. I'm responding a little late because I sometimes have to spend the day behind a very restrictive proxy and can't get to my email. See inline.
On Tue, Sep 2, 2014 at 11:00 AM, Chris Riccomini < [email protected]> wrote: > Hey Roger, > > Thanks for the feedback on the docs. Based on your questions, I can tell > that you have been thorough in your reading! I'm going to try and answer > your questions as best I can, but the truth is, we haven't (yet) thought a > lot about how to implement time ordering in Samza, so think critically > about what I say. > > As I understand your first question about determinism, you are defining > determinism as, "The output will always been exactly the same, even in the > case of failure." This is certainly the most desirable deterministic > behavior one could hope for. > Yes! This is precisely the definition I was looking for. Although, since it's an "at least once" system, we should say that content of the output messages will always be the same in the presence of failure but the number of duplicates produced is unpredictable. > > The way we've been defining determinism in Samza is that there are many > potential "correct" outputs at any given point in time. In our definition, > it is possible to send one correct output during processing, then fail, > re-process, and pick a second correct (but different) correct output. The > key is not to materialize two "correct outputs", since this would result > in duplication: downstream consumers should only ever see one of the > correct answers. > > In the example you provide, we would say that there are two correct > answers: > > {product_id: 99, email: "[email protected]", Š} > > > Or: > > {product_id: 99, email: "[email protected]", Š} > > > It would not be correct to see both in the output stream, though. The > reason we think that it is "correct" to send [email protected] as the final > output is because it is possible that this could have been the output > before the failure occurred. The reason that I say this is that the timing > between the two streams (orders and user_info) is undefined. You've > defined that the consumer read the messages in this order: > > t1: User updates her email to "[email protected]" > t2: User buys a pair of jeans (product_id == 99) > t3: User updates her email to "[email protected]" > > > But it's just as possible that the consumer could have read the messages > in this order: > > t1: User updates her email to "[email protected]" > t2: User updates her email to "[email protected]" > t3: User buys a pair of jeans (product_id == 99) > This makes sense but I believe that Samza cannot guarantee this today because state restoration is not completely consistent yet, nor do we yet have the "perfect rewind" that you described in your follow up email. I love that idea. More on it later. > > > Samza (or Kafka) could introduce some concept of time to fix this. You > could come up with a deterministic MessageChooser that picks messages > based on a timestamp *in* the message. The two options on who assigns this > timestamp (that I can think of) are the producer or the broker. The broker > seems more desirable since it will apply a totally ordered timestamp > across all messages in a single partition, while having the producer > assign timestamp can cause messages to be written to Kafka out of order > (since each producer will have a different timestamp). > > Let's assume that the broker assigns it on a per-partition basis. Let's > also assume that in your example, the user_info partition is on a > different broker from the orders partition, and that each stream is just > one partition. In this case, broker1 assigns time t1 and t3 to the > user_info stream messages, and broker2 assigns time t2 to the orders > stream message. The problem is that the consumer could have read the > messages for t1 and t3, but not yet for t2, since they're from different > brokers. In such a case, what is the desirable behavior for the > MessageChooser? As I see it, there are two choices: block forever until a > message from the orders stream comes in, or time-bound the decision, and > move forward without a message from orders after some time bound. If you > take the former approach, you can introduce a lot of latency on the > user_info stream, but I think you're right: you'll always process messages > in exactly the same order. I think the same holds true for the change log. > The Kafka broker adding it's own timestamps might be a convenient way to try to approximate order across partitions. I don't know if things get weird with cross-data center replication (mirror-maker). People certainly would need to realize that the broker timestamp does not mean anything at the application level. It should only be used for approximately ordering partitions. But it could be a useful MessageChooser that Samza could provide out of the box. I'm assuming it's more useful in most cases than either round robin or strict priority strategies. However, if we had your "perfect rewind" functionality, it's a nice feature but less urgent. > > Seems to me like this is worth opening a JIRA to discuss. > > >>> I'm assuming that change log commits for local state and checkpoint > >>>are done together in an atomic transaction so that they may not always > >>>match. > > > This will be true in the future. Kafka currently does not support atomic > transactions, but there is a JIRA/design/doc/patch up. Once this feature > is committed, we will use it as you describe. > Oh, wow, I don't know that. I assume you're talking about this (*https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka <https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka>)* ? I still think there's a easier/simpler way to do it that doesn't require transactions. I **think** that all you need to do is add one more piece of information to the checkpoint state. In addition to storing the offsets of the input streams, also save the producer offset for the changelog stream. Then during recovery only read the changelog state up saved offset. That will get the local store consistent with the input offsets. The only potential issue here is that there may be additional updates in the changelog that are no longer valid. Those logs should be reverted (marked as tombstone). I'm not sure if there's an easy way to do that with Kafka though. On second thought, this might not be easier than transactions... It looks like new Kafka producer API makes the offsets available: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java#L44 > > >>> Is it possible to write an "EarliestFirstChooser" that always chooses > >>>the oldest message available according to the timestamp it was received > >>>by the Kafka broker? > > Yes, this should be possible. > > >>> I don't know if Kafka stores a timestamp with each message but I'm > >>>assuming it does because it supports an API on the simple consumer > >>>called getOffsetsBefore() that would seem to map from timestamps to > >>>offsets. > > It doesn't keep track of anything like wall-clock time on a per-message > basis. Each message does have an "offset", which is a per-partition > logical measure of how far into a stream the message is. It starts at 0, > and increments once for each message written. I *think* that this is of > little use for ordering, since you could have the offset for the orders > partition be 1000000, while the offset for user_info could be 130. > > >>> I'm using Samza 0.7.0 but the metrics data has the version as > >>>{"samza-version": "0.0.1"}. Is this intentional? > > I think this is a result of the Samza jars not having the proper > META/version info in it. As I recall, the MetricsSnapshotReporter looks > for the version info somewhere in the META-INF folder inside the Samza > jar. If it doesn't find it, it defaults to 0.0.1. That happens here: > > > https://github.com/apache/incubator-samza/blob/master/samza-core/src/main/s > cala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala > > > OK. Thanks! > Cheers, > Chris > > On 9/2/14 8:52 AM, "Roger Hoover" <[email protected]> wrote: > > >Hi Samza devs, > > > >I think this project has the best documentation I've even seen! Amazing > >job. It's extremely well written and Hello Samza is a really great > >example > >that I was able to run + modify without issue. It was a joy reading the > >docs and playing around with example. Kudos! > > > >After thoroughly reading all the docs, I still have a few questions and > >would appreciate any feedback. > > > >I was thinking about how to support deterministic behavior on recovery or > >rewind. Maybe it can't always be 100% deterministic but I think we can > >get > >close. Have other people thought about this? Is it desirable? > > > >For example, let's say we're joining two streams: orders and user_info. > >As > >orders come in, we use the user_id field of the order to lookup additional > >information about the user and enrich the stream. Say we're keeping all > >the user_info state in the local KV store. > > > >t1: User updates her email to "[email protected]" > >t2: User buys a pair of jeans (product_id == 99) > >t3: User updates her email to "[email protected]" > > > >In the case of normal operation (no delays in the user_info stream), the > >enriched record will be: > > > >{product_id: 99, email: "[email protected]", ...} > > > >But say that our job fails before it can checkpoint and is configured to > >bootstrap from user_info. When it gets restarted and bootstraps from the > >user_info stream, it will end up with the email set to "[email protected]" in > >the local KV store. Then it will reprocess the order event and produce > >the > >"wrong" output: > > > >{product_id: 99, email: "[email protected]", ...} > > > >I haven't verified that but the documentation says "a bootstrap stream > >waits for the consumer to explicitly confirm that the stream has been > >fully > >consumed." Shouldn't it wait until it's consumed up the the checkpoint > >offset for the bootsrap stream instead (when there is saved checkpoint > >offset)? > > > >Likewise, for local state replicated in the change log. During the > >checkpoint process, Samza could include it's producer offset in the > >checkpoint data so that during recovery, the local state will be restored > >to a state that corresponds with it's offsets for the input streams. > > Everything would be coherent rather than having the input streams > >restored > >to checkpoint and local state restored to most recent value. I'm assuming > >that change log commits for local state and checkpoint are done together > >in > >an atomic transaction so that they may not always match. > > > >The other missing piece is a nearly deterministic MessageChooser. During > >recovery + rewind, all the messages in both streams are already present in > >Kafka and we want a way to replay them in the same order as if they were > >played in real-time. The only way to approximate this behavior that I can > >see is to use Kafka broker timestamps for each message. Is it possible to > >write an "EarliestFirstChooser" that always chooses the oldest message > >available according to the timestamp it was received by the Kafka broker? > > > >I don't know if Kafka stores a timestamp with each message but I'm > >assuming > >it does because it supports an API on the simple consumer called > >getOffsetsBefore() that would seem to map from timestamps to offsets. > > > >Finally, a nit pick. I'm using Samza 0.7.0 but the metrics data has the > >version as {"samza-version": "0.0.1"}. Is this intentional? > > > >If it makes sense, I can put in some JIRA tickets for this stuff... > > > >Cheers, > > > >Roger > >
