Thanks Robert and Kafka team for the detailed discussion! Unfortunately I have been tied up with some production release issues since late last week and haven't had a chance to weigh in, but I am very interested on the topic. I promise to respond to the questions and comments this week.
Jonathan On Tue, Jun 10, 2014 at 12:38 AM, Robert Hodges <berkeleybob2...@gmail.com> wrote: > Thanks Neha. I am looking at the API call you recommended. > > Cheers, Robert > > > On Mon, Jun 9, 2014 at 12:42 PM, Neha Narkhede <neha.narkh...@gmail.com> > wrote: > > > Is there a convenient way to fetch the last message posted on a > particular > > topic across all partitions? > > > > Not really, unless the message itself has some sort of a timestamp. Even > > then, the order that the broker applies to the log is only guaranteed per > > partition per client. So it is tricky to know the last written message > to a > > topic. You can try to find the last message per partition (using the > > getOffsetsBefore API). > > > > Thanks, > > Neha > > > > > > On Mon, Jun 9, 2014 at 8:55 AM, Robert Hodges <berkeleybob2...@gmail.com > > > > wrote: > > > > > Hi Gouzhang, > > > > > > Thanks for the response. Answers interpolated below. > > > > > > Cheers, Robert > > > > > > On Mon, Jun 9, 2014 at 8:15 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Robert, > > > > > > > > Thanks for the description. Just want to clarify on some of the > points > > > > (assuming one transaction may include multiple messages below): > > > > > > > > 2) For the "one-to-one mapping" to work, does the consumer can only > > read > > > at > > > > transaction boundaries, i.e., all or none messages are returned to > the > > > > consumer of a single transaction at once; or it is sufficient to let > > > > consumers just read committed messages? For the use case you > described > > it > > > > seems the second option is good enough. > > > > > > > > > > Consumers just read committed messages from Kafka itself. Application > > > transactions could be layered on top using the message key, since such > > > transactions might consist of multiple Kafka messages. It's up to the > > > consumer to avoid committing a partial transaction. > > > > > > > > > > > 4) If an upstream data source / producer has failed and lost some > > > committed > > > > transactions, and then on restart regenerates them, since the > > transaction > > > > has been previously committed the downstream consumer may have > already > > > > consumed their messages, and regenerating the transaction will > > inevitably > > > > result in duplicates. Is that OK for your case? > > > > > > > > Assuming it is possible to regenerate upstream transactions, > downstream > > > consumers should do one of two things: > > > > > > a.) For non-idempotent consumers: Remember the last committed > > application > > > transaction and ignore anything before that point. > > > b.) For idempotent consumers: Just repeat them. > > > > > > The uglier problem is what to do when the logs diverge because the > > upstream > > > server cannot regenerate data. In this case you start by hoping the > > > consumer is something like Hadoop that easily tolerates inconsistencies > > in > > > data. Things may go downhill quickly if the consumer is an RDBMS. :( > > > > > > Is there a convenient way to fetch the last message posted on a > > particular > > > topic across all partitions? (My laptop currently is about 120 miles > > away > > > so it's hard to look.) If so, it looks to me as if there is enough in > the > > > Kafka producer and consumer APIs to implement what I am describing > > without > > > too many holes. I believe the trick is to design a message key that > > > contains a monotonically increasing transaction ID with a fragment > index > > to > > > allow transactions to span Kafka messages but keep all of them (for > > > example) in a single partition. > > > > > > If I have time next weekend I might try to create an example of this to > > see > > > what problems pop up. > > > > > > Cheers, Robert > > > > > > > > > > Thanks, > > > > Guozhang > > > > > > > > > > > > On Sat, Jun 7, 2014 at 11:30 PM, Robert Hodges < > > > berkeleybob2...@gmail.com> > > > > wrote: > > > > > > > > > Hi Jonathan and Jun, > > > > > > > > > > Transactional replication using Kafka between stores at either end > is > > > an > > > > > interesting topic. I have some experience with this problem in > > database > > > > > replication products. > > > > > > > > > > To understand how to implement it properly in Kafka it would help > to > > > > define > > > > > Jonathan's use case more formally. As I read the description there > > are > > > > > three parts: a source DBMS, Kafka, and an analytics store. These > can > > > be > > > > > arranged as follows: > > > > > > > > > > Producer Store -> Kafka -> Consumer Store > > > > > > > > > > e.g.: > > > > > > > > > > MySQL -> Kafka -> Spark over HDFS > > > > > > > > > > This is like the usual producer/consumer model except that the > > > semantics > > > > > are as follows. I added some details to the description to > > > accommodate a > > > > > number of practical problems that occur in replication topologies > of > > > this > > > > > kind. > > > > > > > > > > 1.) The producer and consumer in the topology are stores with state > > and > > > > > some notion of a transaction that changes the state of the store to > > > which > > > > > they are applied. Kafka is in the middle and also has > transactions, > > > > namely > > > > > to produce and consume messages. > > > > > > > > > > 2.) If a transaction executes on the producer store, you would like > > to > > > > > execute a corresponding transaction on the consumer store. The > > > > transaction > > > > > might not have the same effect downstream but the point is that > > > > > transactions are linked one-to-one between producer and consumer. > > > > > > > > > > 3.) All of the stores or Kafka can fail independently and at any > > time. > > > > It > > > > > must be possible to recover and continue once a failed component > > > > restarts. > > > > > > > > > > 4.) It is possible to have failures where a store or Kafka itself > > loses > > > > > committed state and reverts to an earlier state. This happens in > > MySQL > > > > for > > > > > example, when a host crashes before data are properly committed to > > > InnoDB > > > > > and/or the MySQL binlog. It can also happen if the upstream DBMS is > > > > > restored from a backup or as a result of cluster failover with data > > > loss. > > > > > In this case you either want to regenerate lost transactions or > (if > > it > > > > is > > > > > hopeless) fail cleanly. > > > > > > > > > > 5.) Producer transactions might be larger than a single Kafka > message > > > > (e.g. > > > > > a KeyedMessage). They may not even fit into a single JVM's memory. > > > This > > > > > can occur for example if you do a bulk load or an administrative > > > > operation > > > > > on a large table in the producer store. You might not have this > > > problem > > > > > now but given your requirement to work with a range of stores it > > seems > > > > > likely to occur sooner rather than later. Such transactions must be > > > > broken > > > > > into a stream of smaller messages with a protocol to identify that > > they > > > > > belong to a single transaction. If there are failures such > fragmented > > > > > transactions must not result in partial transactions being applied > to > > > the > > > > > consumer. > > > > > > > > > > 6.) All of the preceding requirements should be met with minimal > > impact > > > > on > > > > > message throughput or transaction rates within stores at either > end. > > > > > > > > > > Let me know if this is more than what you (Jonathan) intended. > > Usually > > > > if > > > > > you really want #2, requirements #3-6 follow automatically. #5 is > > > > > potentially a source of much pain if not addressed early on. > > > > > > > > > > Pending a response, I would just say solutions that require a > > > > transactional > > > > > commit across two stores are difficult to write, often have > > performance > > > > > downsides, and handle failures poorly because they cannot cover all > > the > > > > > corner cases. The last point means they tend to drop data, > generate > > > > > unmatched transactions (orphans), or send things multiple times > > > depending > > > > > on the failure. > > > > > > > > > > It's generally better to design systems that use a sliding window > > > > protocol > > > > > where a commit in the producer triggers a commit to Kafka triggers > a > > > > commit > > > > > to the consumer. Assuming your requirements are as stated above the > > > > > question is how to design a transactional sliding window protocol > > that > > > > > works on Kafka. > > > > > > > > > > Cheers, Robert Hodges > > > > > > > > > > > > > > > On Thu, Jun 5, 2014 at 7:48 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > > > It sounds like that you want to write to a data store and a data > > pipe > > > > > > atomically. Since both the data store and the data pipe that you > > want > > > > to > > > > > > use are highly available, the only case that you want to protect > is > > > the > > > > > > client failing btw the two writes. One way to do that is to let > the > > > > > client > > > > > > publish to Kafka first with the strongest ack. Then, run a few > > > > consumers > > > > > to > > > > > > read data from Kafka and then write the data to the data store. > Any > > > one > > > > > of > > > > > > those consumers can die and the work will be automatically picked > > up > > > by > > > > > the > > > > > > remaining ones. You can use partition id and the offset of each > > > message > > > > > as > > > > > > its UUID if needed. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Wed, Jun 4, 2014 at 10:56 AM, Jonathan Hodges < > > hodg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Sorry didn't realize the mailing list wasn't copied... > > > > > > > > > > > > > > > > > > > > > ---------- Forwarded message ---------- > > > > > > > From: Jonathan Hodges <hodg...@gmail.com> > > > > > > > Date: Wed, Jun 4, 2014 at 10:56 AM > > > > > > > Subject: Re: Hadoop Summit Meetups > > > > > > > To: Neha Narkhede <neha.narkh...@gmail.com> > > > > > > > > > > > > > > > > > > > > > We have a number of customer facing online learning > applications. > > > > > These > > > > > > > applications are using heterogeneous technologies with > different > > > data > > > > > > > models in underlying data stores such as RDBMS, Cassandra, > > MongoDB, > > > > > etc. > > > > > > > We would like to run offline analysis on the data contained in > > > these > > > > > > > learning applications with tools like Hadoop and Spark. > > > > > > > > > > > > > > One thought is to use Kafka as a way for these learning > > > applications > > > > to > > > > > > > emit data in near real-time for analytics. We developed a > common > > > > model > > > > > > > represented as Avro records in HDFS that spans these learning > > > > > > applications > > > > > > > so that we can accept the same structured message from them. > > This > > > > > allows > > > > > > > for comparing apples to apples across these apps as opposed to > > > messy > > > > > > > transformations. > > > > > > > > > > > > > > So this all sounds good until you dig into the details. One > > > pattern > > > > is > > > > > > for > > > > > > > these applications to update state locally in their data stores > > > first > > > > > and > > > > > > > then publish to Kafka. The problem with this is these two > > > operations > > > > > > > aren't atomic so the local persist can succeed and the publish > to > > > > Kafka > > > > > > > fail leaving the application and HDFS out of sync. You can try > > to > > > > add > > > > > > some > > > > > > > retry logic to the clients, but this quickly becomes very > > > complicated > > > > > and > > > > > > > still doesn't solve the underlying problem. > > > > > > > > > > > > > > Another pattern is to publish to Kafka first with -1 and wait > for > > > the > > > > > ack > > > > > > > from leader and replicas before persisting locally. This is > > > probably > > > > > > > better than the other pattern but does add some complexity to > the > > > > > client. > > > > > > > The clients must now generate unique entity IDs/UUID for > > > persistence > > > > > > when > > > > > > > they typically rely on the data store for creating these. Also > > the > > > > > > publish > > > > > > > to Kafka can succeed and persist locally can fail leaving the > > > stores > > > > > out > > > > > > of > > > > > > > sync. In this case the learning application needs to determine > > how > > > > to > > > > > > get > > > > > > > itself in sync. It can rely on getting this back from Kafka, > but > > > it > > > > is > > > > > > > possible the local store failure can't be fixed in a timely > > manner > > > > e.g. > > > > > > > hardware failure, constraint, etc. In this case the > application > > > > needs > > > > > to > > > > > > > show an error to the user and likely need to do something like > > > send a > > > > > > > delete message to Kafka to remove the earlier published > message. > > > > > > > > > > > > > > A third last resort pattern might be go the CDC route with > > > something > > > > > like > > > > > > > Databus. This would require implementing additional fetchers > and > > > > > relays > > > > > > to > > > > > > > support Cassandra and MongoDB. Also the data will need to be > > > > > transformed > > > > > > > on the Hadoop/Spark side for virtually every learning > application > > > > since > > > > > > > they have different data models. > > > > > > > > > > > > > > I hope this gives enough detail to start discussing > transactional > > > > > > messaging > > > > > > > in Kafka. We are willing to help in this effort if it makes > > sense > > > > for > > > > > > our > > > > > > > use cases. > > > > > > > > > > > > > > Thanks > > > > > > > Jonathan > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jun 4, 2014 at 9:44 AM, Neha Narkhede < > > > > neha.narkh...@gmail.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > If you are comfortable, share it on the mailing list. If not, > > I'm > > > > > happy > > > > > > > to > > > > > > > > have this discussion privately. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Neha > > > > > > > > On Jun 4, 2014 9:42 AM, "Neha Narkhede" < > > neha.narkh...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Glad it was useful. It will be great if you can share your > > > > > > requirements > > > > > > > >> on atomicity. A couple of us are very interested in thinking > > > about > > > > > > > >> transactional messaging in Kafka. > > > > > > > >> > > > > > > > >> Thanks, > > > > > > > >> Neha > > > > > > > >> On Jun 4, 2014 6:57 AM, "Jonathan Hodges" < > hodg...@gmail.com> > > > > > wrote: > > > > > > > >> > > > > > > > >>> Hi Neha, > > > > > > > >>> > > > > > > > >>> Thanks so much to you and the Kafka team for putting > together > > > the > > > > > > > meetup. > > > > > > > >>> It was very nice and gave people from out of town like us > > the > > > > > > ability > > > > > > > to > > > > > > > >>> join in person. > > > > > > > >>> > > > > > > > >>> We are the guys from Pearson Education and we talked a > little > > > > about > > > > > > > >>> supplying some details on some of our use cases with > respect > > to > > > > > > > atomicity > > > > > > > >>> of source systems eventing data and persisting locally. > > Should > > > > we > > > > > > just > > > > > > > >>> post to the list or is there somewhere else we should send > > > these > > > > > > > details? > > > > > > > >>> > > > > > > > >>> Thanks again! > > > > > > > >>> Jonathan > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> On Fri, Apr 11, 2014 at 9:31 AM, Neha Narkhede < > > > > > > > neha.narkh...@gmail.com> > > > > > > > >>> wrote: > > > > > > > >>> > > > > > > > >>> > Yes, that's a great idea. I can help organize the meetup > at > > > > > > LinkedIn. > > > > > > > >>> > > > > > > > > >>> > Thanks, > > > > > > > >>> > Neha > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > On Fri, Apr 11, 2014 at 8:44 AM, Saurabh Agarwal > > (BLOOMBERG/ > > > > 731 > > > > > > > >>> LEXIN) < > > > > > > > >>> > sagarwal...@bloomberg.net> wrote: > > > > > > > >>> > > > > > > > > >>> > > great idea. I am interested in attending as well.... > > > > > > > >>> > > > > > > > > > >>> > > ----- Original Message ----- > > > > > > > >>> > > From: users@kafka.apache.org > > > > > > > >>> > > To: users@kafka.apache.org > > > > > > > >>> > > At: Apr 11 2014 11:40:56 > > > > > > > >>> > > > > > > > > > >>> > > With the Hadoop Summit in San Jose 6/3 - 6/5 I wondered > > if > > > > any > > > > > of > > > > > > > the > > > > > > > >>> > > LinkedIn geniuses were thinking of putting together a > > > meet-up > > > > > on > > > > > > > any > > > > > > > >>> of > > > > > > > >>> > the > > > > > > > >>> > > associated technologies like Kafka, Samza, Databus, > etc. > > > For > > > > > us > > > > > > > poor > > > > > > > >>> > souls > > > > > > > >>> > > that don't live on the West Coast it was a great > > experience > > > > > > > >>> attending the > > > > > > > >>> > > Kafka meetup last year. > > > > > > > >>> > > > > > > > > > >>> > > Jonathan > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > ------------------------------------------------------------------------------- > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > >