On Wednesday, November 20, 2019, Matthias J. Sax <matth...@confluent.io> wrote:
> I am not sure what Spring does, but using Kafka Streams writing the > output and committing offset would be part of the same transaction. > > It seems Spring is doing something else and thus, is seems it does not > use the EOS API correctly. > > If you use transactions to copy data from input to output topic, > committing offsets must be done on the producer as part to the > transaction; the consumer would not commit offsets. > > To me, it seems that Spring is committing offset using the consumer > independent if the transaction was successful or not. This would be an > incorrect usage of the API. > > > -Matthias > > On 11/20/19 6:16 AM, Edward Capriolo wrote: > > Ok. I'm at a point where I believe the exactly once is in question. > > > > Topic input 10 partitions topic output 10 partitions. > > > > Producer writes messages 1 to 100 to topic input. > > > > CTP process calls poll. It receives 100 messages 10 in each partiton. > > > > Process is simple mirroring take from input write to output. > > > > 10 producers with 10 transactional ids are created. During processing 1 > of > > the 10 producers throws kafka exception. 90 out of 100 writes are > committed > > tranactionally 10 are not. > > > > If poll is called again 10 messages do not appear in the next poll. Are > > they lost? > > > > > > > > On Saturday, November 9, 2019, Edward Capriolo <edlinuxg...@gmail.com> > > wrote: > > > >> > >> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Quite a project to test transactions... > >>> > >>> The current system test suite is part of the code base: > >>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests > >>> > >>> There is course also some unit/integration test for transactions. > >>> > >>> There is also a blog post that describes in a high level what testing > >>> was done when EOS was introduced: > >>> https://www.confluent.io/blog/exactly-once-semantics-are- > >>> possible-heres-how-apache-kafka-does-it/ > >>> > >>> And yes, transactions are built on some assumptions and if you > configure > >>> your system incorrectly of violate those assumptions, it may break. We > >>> also fixed some bugs since the first release. And there might be more > >>> bugs --- software is always buggy. However, for practical > consideration, > >>> transactions should work. > >>> > >>> We would of course love if you could share your test results! If you > >>> discover a bug, please report it, so we can fix it. > >>> > >>> > >>> -Matthias > >>> > >>> On 10/28/19 10:06 AM, Edward Capriolo wrote: > >>>> On Sunday, October 27, 2019, Boyang Chen <reluctanthero...@gmail.com> > >>> wrote: > >>>> > >>>>> Hey Edward, > >>>>> > >>>>> just to summarize and make sure I understood your question, you want > to > >>>>> implement some Chaos testing to validate Kafka EOS model, but not > sure > >>> how > >>>>> to start or curious about whether there are already works in the > >>> community > >>>>> doing that? > >>>>> > >>>>> For the correctness of Kafka EOS, we have tons of unit tests and > system > >>>>> tests to prove its functionality. They could be found inside the > repo. > >>> You > >>>>> could check them out and see if we still have gaps (which I believe > we > >>>>> definitely have). > >>>>> > >>>>> Boyang > >>>>> > >>>>> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo < > edlinuxg...@gmail.com > >>>> > >>>>> wrote: > >>>>> > >>>>>> Hello all, > >>>>>> > >>>>>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per > >>>>>> thousand ad impression. If numbers are 5% off you can blame > javascript > >>>>>> click trackers. > >>>>>> > >>>>>> Now, I work in a non addtech industry and they are really, really > >>> serious > >>>>>> about exactly once. > >>>>>> > >>>>>> So there is this blog: > >>>>>> > >>>>>> https://www.confluent.io/blog/transactions-apache-kafka/ > >>>>>> > >>>>>> Great little snippet of code. I think I can copy it and implement it > >>>>>> correctly. > >>>>>> You know, but if you read that section about the zombie fencing, you > >>>>> learn > >>>>>> that you either need to manually assign partitions or use the > >>> rebalance > >>>>>> listener and have N producers. Hunting around github is not super > >>>>> helpful, > >>>>>> some code snippets are less complete then even the snipped in the > >>> blog. > >>>>>> > >>>>>> I looked at what spring-kafka does. It does get the zombie fencing > >>>>> correct > >>>>>> with respect to fencing id, other bits other bits of the code seem > >>>>>> plausible. > >>>>>> > >>>>>> Notice I said "plausible", because I do not count a few end to end > >>> tests > >>>>>> running single VM as a solid enough evidence that this works in the > >>> face > >>>>> of > >>>>>> failures. > >>>>>> > >>>>>> I have been contemplating how one stress tests this exactly once > >>> concept, > >>>>>> with something Jepsen like or something brute force that I can run > >>> for 5 > >>>>>> hours in a row > >>>>>> > >>>>>> If I faithfully implemented the code in the transactional read-write > >>> loop > >>>>>> and I feed it into my jepsen like black box tester it should: > >>>>>> > >>>>>> Create a topic with 10 partitions, Start launching read-write > >>> transaction > >>>>>> code, start feeding input data, Maybe strings like 1 -1000, now > start > >>>>>> randomly killing vms with kill -9 kill graceful exits, maybe even > >>> killing > >>>>>> kafka, and make sure 1-1000, pop out on the other end. > >>>>>> > >>>>>> I thought of some other "crazy" ideas. One such idea: > >>>>>> > >>>>>> If I make a transactional "echo", read x; write x back to the same > >>> topic. > >>>>>> RunN instances of that and kill them randomly. If I am loosing > >>> messages > >>>>>> (and not duplicating messages) then the topic would eventually have > no > >>>>>> data.. > >>>>>> > >>>>>> Or should I make a program with some math formula like receive x > write > >>>>> xx. > >>>>>> If duplication is happening I would start seeing multiple xx's > >>>>>> > >>>>>> Or send 1,000,000,000 messages through and consumer logs them to a > >>> file. > >>>>>> Then use an etl tool to validate messages come out on the other > side. > >>>>>> > >>>>>> Or should I use a nosql with increments and count up and ensure no > key > >>>>> has > >>>>>> been incremented twice. > >>>>>> > >>>>>> note: I realize I can just use kafka streams or storm, which has its > >>> own > >>>>>> systems to guarantee "at most once" but Iooking for a way to prove > >>> what > >>>>> can > >>>>>> be done with pure kafka. (and not just prove it adtech work (good > >>> enough > >>>>> 5% > >>>>>> here or there) ) > >>>>>> > >>>>>> I imagine someone somewhere must be doing this. How where tips? Is > it > >>>>> part > >>>>>> of some kafka release stress test? I'm down to write it if it does > not > >>>>>> exist. > >>>>>> > >>>>>> Thanks, > >>>>>> Edward, > >>>>>> > >>>>>> Thanks, > >>>>>> Edward > >>>>>> > >>>>> > >>>> > >>>> Boyang, > >>>> > >>>> I just to summarize and make sure I understood your question, you want > >>> to > >>>> implement some Chaos testing to validate Kafka EOS model, but not sure > >>> how > >>>> to start or curious about whether there are already works in the > >>> community > >>>> doing that? > >>>> > >>>> Yes. > >>>> > >>>> I am not an expert in this field, but I know what distributed systems > >>> can > >>>> mask failures. For example if you have atomic increment you might unit > >>> test > >>>> it and it works fine, but if you ran it for 40 days it might double > >>> count 1 > >>>> time. > >>>> > >>>> of Kafka EOS, we have tons of unit tests and system > >>>> tests to prove its functionality. They could be found inside the repo. > >>>> > >>>> I've been a developer for a while so the phrase "there are tests" > never > >>>> tells me everything. Tests reveal the presence of bugs not the > absence. > >>>> > >>>> Can you please point me at the tests? My curiosity is if there is a > >>>> systematic in-depth strategy here and how much rigor there is. > >>>> > >>>> In my environment I need to quantify and use rigor to prove out these > >>>> things. Things that you might take for granted. For example, I have to > >>>> prove that zookeeper works as expected when we lose a datacenter. Most > >>>> people 'in the know' take it for granted that kafka and zk do what is > >>>> advertised when configured properly. I have to test that out and > >>> document > >>>> my findings. > >>>> > >>>> For kafka transactions. The user space code needs to be written > properly > >>>> and configured properly along with the server being setup properly. It > >>> is > >>>> not enough for me to check out kafka run 'sbt test' and declare > victory > >>>> after the unit tests pass. > >>>> > >>>> What I am effectively looking for is the anti jepsen blog that > says...We > >>>> threw the kitchen sink at this and these transactions are bullet > proof. > >>>> Here is our methodology, here is some charts, here is xyz. Here is how > >>> we > >>>> run it every minor release > >>>> > >>>> I'm not trying to be a pita, educate me on how bullet proof this is > and > >>> how > >>>> I can reproduce the results. > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>> > >>> > >> > >> All, > >> > >> After a few weeks of hacking at this I have found a few interesting > >> things. First things first, introducing Kafos > >> > >> https://github.com/edwardcapriolo/kafos > >> > >> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove > >> out Kafka transactions do exacty-once by launching multiple servers and > >> consume/producers, killing them off and counting how many messages land > on > >> the other end. > >> > >> Note: The project is a little raw at this point, (it has hard code to my > >> home folders and some long Thread.sleep() calls in there. I will clean > it > >> up as I go,) but I have some early findings. > >> > >> From the blog here, I want to point out the following flaws: > >> > >> https://www.confluent.io/blog/transactions-apache-kafka/ > >> > >> KafkaProducer producer = createKafkaProducer( > >> “bootstrap.servers”, “localhost:9092”, > >> “transactional.id”, “my-transactional-id”); > >> > >> 1) a fixed transaction.id effectively means the first worker fences all > >> the other ones.While being exactly-once, there is exactly one worker > and no > >> parallelism. > >> > >> Spring has two schemes that seem to implement random information in the > >> fencing ids, which I think defeats the purpose. But I need to dive in on > >> that more. > >> https://docs.spring.io/spring-kafka/reference/html/ > >> > >> Transactions are enabled by providing the DefaultKafkaProducerFactory > >> with a transactionIdPrefix. In that case, instead of managing a single > >> shared Producer, the factory maintains a cache of transactional > >> producers. When the user calls close() on a producer, it is returned to > >> the cache for reuse instead of actually being closed. The > transactional.id > >> property of each producer is transactionIdPrefix + n, where n starts > with > >> 0 and is incremented for each new producer, unless the transaction is > >> started by a listener container with a record-based listener. In that > case, > >> the transactional.id is <transactionIdPrefix>.<group.id > >>> .<topic>.<partition>. This is to properly support fencing zombies, as > >> described here <https://www.confluent.io/blog/transactions-apache- > kafka/>. > >> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and > 2.2.0. If > >> you wish to revert to the previous behavior, you can set the > >> producerPerConsumerPartition property on the DefaultKafkaProducerFactory > >> to false. > >> > >> While transactions are supported with batch listeners, by default, > zombie > >> fencing is not supported because a batch may contain records from > multiple > >> topics or partitions. However, starting with version 2.3.2, zombie > fencing > >> is supported if you set the container property subBatchPerPartition to > >> true. In that case, the batch listener is invoked once per partition > >> received from the last poll, as if each poll only returned records for a > >> single partition. > >> > >> How to fix this? Here be the dragons: > >> > >> "Practically, one would either have to store the mapping between input > >> partitions and transactional.ids in an external store, or have some > static > >> encoding of it. Kafka Streams opts for the latter approach to solve this > >> problem." > >> > >> Well then practically, the rest of this code is not useful. > >> > >> while (true) { > >> ConsumerRecords records = consumer.poll(Long.MAX_VALUE); > >> producer.beginTransaction(); > >> for (ConsumerRecord record : records) > >> producer.send(producerRecord(“outputTopic”, record)); > >> producer.sendOffsetsToTransaction(currentOffsets(consumer), group); > >> producer.commitTransaction(); > >> } > >> > >> When you call consumer.poll() (unless you manually assign to a single > >> partiion) you are going to receive records from one or more > partitions. As > >> consumers join and leave the the group workers are going to move. > >> > >> There are two ways I can find to do this: > >> 1) use a consumerRebalanceListener and a concurrent map of producers > >> https://github.com/edwardcapriolo/kafos/blob/ > >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123 > >> > >> 2) as you are polling create/reuse producers as needed. > >> > >> With scalable correctly fenced producers the loop looks more like this: > >> > >> ConsumerRecords<K,V> records = null; > >> try { > >> records = consumer.poll(2000); > >> } catch (KafkaException e){ > >> return; > >> } > >> > >> for(ConsumerRecord<K,V> record: records) { > >> int partition = record.partition(); > >> BufferedTrackingProducer<K,V> producer = producers.get(partition); > >> List<ProducerRecord<K, V>> results = processor.process(record); > >> producer.send(results); > >> > >> } > >> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry: > producers.entrySet()) > >> { > >> entry.getValue().commitAndClear(records, groupId); > >> } > >> > >> Implementation here: https://github.com/edwardcapriolo/kafos/blob/ > >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1 > >> > >> BTW maybe I am wrong about this and there is a better approach. With the > >> multiple producer threads for single consumer.poll figuring out exactly > how > >> to layer the try/catch and exception handling becomes a bit harder than > the > >> example in the blog. > >> > >> Anyway, here is what a Kafos chaos "test" looks like. > >> > >> https://github.com/edwardcapriolo/kafos/blob/ > >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > >> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1 > >> > >> It is a bit silly that we are forking off processes like this, but I did > >> not want processes running in the same VM.I am not ready to run > >> testcontainers/docker at this point. But hey we launch kafka, we launch > zk, > >> we start workers, we kill them off we count things. Results look like > >> this. > >> > >> > >> Verification stats > >> ------------------------------------------------------------------ > >> notFound: 0 foundCorrect: 4000 foundDuplications: 0 > >> ------------------------------------------------------------------ > >> not found list[] > >> serverid: 0 out line Stream closed > >> > >> I am going to do more work killing off servers and making more test > >> scenarios, but so far off to a good start. When you get all the code > right, > >> (by cobbling together all the sources out there with half the code) you > get > >> things that seem to do exactly once, Help wanted in acedemic review of > my > >> assertions and code or PRS > >> > >> Thanks > >> Edward > >> > > > > > > Kafka will throw you a warning if you try to mix consumer commit with transaction commits. -- Sorry this was sent from mobile. Will do less grammar and spell check than usual.