On Wednesday, November 20, 2019, Eric Azama <eazama...@gmail.com> wrote:
> Calls to KafkaConsumer#poll() are completely independent of commits. As > such they will always return the next set of records, even if the previous > set have not been committed. This is how the consumer acts, regardless of > the Exactly Once semantics. > > In order for the Consumer to reset to the currently committed offsets, you > need to either initiate a Consumer Group Rebalance, or use a combination of > the KafkaConsumer#committed() and KafkaConsumer#seek() methods. > > On Wed, Nov 20, 2019 at 6:16 AM Edward Capriolo <edlinuxg...@gmail.com> > wrote: > > > Ok. I'm at a point where I believe the exactly once is in question. > > > > Topic input 10 partitions topic output 10 partitions. > > > > Producer writes messages 1 to 100 to topic input. > > > > CTP process calls poll. It receives 100 messages 10 in each partiton. > > > > Process is simple mirroring take from input write to output. > > > > 10 producers with 10 transactional ids are created. During processing 1 > of > > the 10 producers throws kafka exception. 90 out of 100 writes are > committed > > tranactionally 10 are not. > > > > If poll is called again 10 messages do not appear in the next poll. Are > > they lost? > > > > > > > > On Saturday, November 9, 2019, Edward Capriolo <edlinuxg...@gmail.com> > > wrote: > > > > > > > > On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > >> Quite a project to test transactions... > > >> > > >> The current system test suite is part of the code base: > > >> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests > > >> > > >> There is course also some unit/integration test for transactions. > > >> > > >> There is also a blog post that describes in a high level what testing > > >> was done when EOS was introduced: > > >> https://www.confluent.io/blog/exactly-once-semantics-are- > > >> possible-heres-how-apache-kafka-does-it/ > > >> > > >> And yes, transactions are built on some assumptions and if you > configure > > >> your system incorrectly of violate those assumptions, it may break. We > > >> also fixed some bugs since the first release. And there might be more > > >> bugs --- software is always buggy. However, for practical > consideration, > > >> transactions should work. > > >> > > >> We would of course love if you could share your test results! If you > > >> discover a bug, please report it, so we can fix it. > > >> > > >> > > >> -Matthias > > >> > > >> On 10/28/19 10:06 AM, Edward Capriolo wrote: > > >> > On Sunday, October 27, 2019, Boyang Chen < > reluctanthero...@gmail.com> > > >> wrote: > > >> > > > >> >> Hey Edward, > > >> >> > > >> >> just to summarize and make sure I understood your question, you > want > > to > > >> >> implement some Chaos testing to validate Kafka EOS model, but not > > sure > > >> how > > >> >> to start or curious about whether there are already works in the > > >> community > > >> >> doing that? > > >> >> > > >> >> For the correctness of Kafka EOS, we have tons of unit tests and > > system > > >> >> tests to prove its functionality. They could be found inside the > > repo. > > >> You > > >> >> could check them out and see if we still have gaps (which I believe > > we > > >> >> definitely have). > > >> >> > > >> >> Boyang > > >> >> > > >> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo < > > edlinuxg...@gmail.com > > >> > > > >> >> wrote: > > >> >> > > >> >>> Hello all, > > >> >>> > > >> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 > per > > >> >>> thousand ad impression. If numbers are 5% off you can blame > > javascript > > >> >>> click trackers. > > >> >>> > > >> >>> Now, I work in a non addtech industry and they are really, really > > >> serious > > >> >>> about exactly once. > > >> >>> > > >> >>> So there is this blog: > > >> >>> > > >> >>> https://www.confluent.io/blog/transactions-apache-kafka/ > > >> >>> > > >> >>> Great little snippet of code. I think I can copy it and implement > it > > >> >>> correctly. > > >> >>> You know, but if you read that section about the zombie fencing, > you > > >> >> learn > > >> >>> that you either need to manually assign partitions or use the > > >> rebalance > > >> >>> listener and have N producers. Hunting around github is not super > > >> >> helpful, > > >> >>> some code snippets are less complete then even the snipped in the > > >> blog. > > >> >>> > > >> >>> I looked at what spring-kafka does. It does get the zombie fencing > > >> >> correct > > >> >>> with respect to fencing id, other bits other bits of the code seem > > >> >>> plausible. > > >> >>> > > >> >>> Notice I said "plausible", because I do not count a few end to end > > >> tests > > >> >>> running single VM as a solid enough evidence that this works in > the > > >> face > > >> >> of > > >> >>> failures. > > >> >>> > > >> >>> I have been contemplating how one stress tests this exactly once > > >> concept, > > >> >>> with something Jepsen like or something brute force that I can run > > >> for 5 > > >> >>> hours in a row > > >> >>> > > >> >>> If I faithfully implemented the code in the transactional > read-write > > >> loop > > >> >>> and I feed it into my jepsen like black box tester it should: > > >> >>> > > >> >>> Create a topic with 10 partitions, Start launching read-write > > >> transaction > > >> >>> code, start feeding input data, Maybe strings like 1 -1000, now > > start > > >> >>> randomly killing vms with kill -9 kill graceful exits, maybe even > > >> killing > > >> >>> kafka, and make sure 1-1000, pop out on the other end. > > >> >>> > > >> >>> I thought of some other "crazy" ideas. One such idea: > > >> >>> > > >> >>> If I make a transactional "echo", read x; write x back to the same > > >> topic. > > >> >>> RunN instances of that and kill them randomly. If I am loosing > > >> messages > > >> >>> (and not duplicating messages) then the topic would eventually > have > > no > > >> >>> data.. > > >> >>> > > >> >>> Or should I make a program with some math formula like receive x > > write > > >> >> xx. > > >> >>> If duplication is happening I would start seeing multiple xx's > > >> >>> > > >> >>> Or send 1,000,000,000 messages through and consumer logs them to a > > >> file. > > >> >>> Then use an etl tool to validate messages come out on the other > > side. > > >> >>> > > >> >>> Or should I use a nosql with increments and count up and ensure no > > key > > >> >> has > > >> >>> been incremented twice. > > >> >>> > > >> >>> note: I realize I can just use kafka streams or storm, which has > its > > >> own > > >> >>> systems to guarantee "at most once" but Iooking for a way to prove > > >> what > > >> >> can > > >> >>> be done with pure kafka. (and not just prove it adtech work (good > > >> enough > > >> >> 5% > > >> >>> here or there) ) > > >> >>> > > >> >>> I imagine someone somewhere must be doing this. How where tips? Is > > it > > >> >> part > > >> >>> of some kafka release stress test? I'm down to write it if it does > > not > > >> >>> exist. > > >> >>> > > >> >>> Thanks, > > >> >>> Edward, > > >> >>> > > >> >>> Thanks, > > >> >>> Edward > > >> >>> > > >> >> > > >> > > > >> > Boyang, > > >> > > > >> > I just to summarize and make sure I understood your question, you > want > > >> to > > >> > implement some Chaos testing to validate Kafka EOS model, but not > sure > > >> how > > >> > to start or curious about whether there are already works in the > > >> community > > >> > doing that? > > >> > > > >> > Yes. > > >> > > > >> > I am not an expert in this field, but I know what distributed > systems > > >> can > > >> > mask failures. For example if you have atomic increment you might > unit > > >> test > > >> > it and it works fine, but if you ran it for 40 days it might double > > >> count 1 > > >> > time. > > >> > > > >> > of Kafka EOS, we have tons of unit tests and system > > >> > tests to prove its functionality. They could be found inside the > repo. > > >> > > > >> > I've been a developer for a while so the phrase "there are tests" > > never > > >> > tells me everything. Tests reveal the presence of bugs not the > > absence. > > >> > > > >> > Can you please point me at the tests? My curiosity is if there is a > > >> > systematic in-depth strategy here and how much rigor there is. > > >> > > > >> > In my environment I need to quantify and use rigor to prove out > these > > >> > things. Things that you might take for granted. For example, I have > to > > >> > prove that zookeeper works as expected when we lose a datacenter. > Most > > >> > people 'in the know' take it for granted that kafka and zk do what > is > > >> > advertised when configured properly. I have to test that out and > > >> document > > >> > my findings. > > >> > > > >> > For kafka transactions. The user space code needs to be written > > properly > > >> > and configured properly along with the server being setup properly. > It > > >> is > > >> > not enough for me to check out kafka run 'sbt test' and declare > > victory > > >> > after the unit tests pass. > > >> > > > >> > What I am effectively looking for is the anti jepsen blog that > > says...We > > >> > threw the kitchen sink at this and these transactions are bullet > > proof. > > >> > Here is our methodology, here is some charts, here is xyz. Here is > how > > >> we > > >> > run it every minor release > > >> > > > >> > I'm not trying to be a pita, educate me on how bullet proof this is > > and > > >> how > > >> > I can reproduce the results. > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > >> > > > > > > All, > > > > > > After a few weeks of hacking at this I have found a few interesting > > > things. First things first, introducing Kafos > > > > > > https://github.com/edwardcapriolo/kafos > > > > > > Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove > > > out Kafka transactions do exacty-once by launching multiple servers and > > > consume/producers, killing them off and counting how many messages land > > on > > > the other end. > > > > > > Note: The project is a little raw at this point, (it has hard code to > my > > > home folders and some long Thread.sleep() calls in there. I will clean > it > > > up as I go,) but I have some early findings. > > > > > > From the blog here, I want to point out the following flaws: > > > > > > https://www.confluent.io/blog/transactions-apache-kafka/ > > > > > > KafkaProducer producer = createKafkaProducer( > > > “bootstrap.servers”, “localhost:9092”, > > > “transactional.id”, “my-transactional-id”); > > > > > > 1) a fixed transaction.id effectively means the first worker fences > all > > > the other ones.While being exactly-once, there is exactly one worker > and > > no > > > parallelism. > > > > > > Spring has two schemes that seem to implement random information in the > > > fencing ids, which I think defeats the purpose. But I need to dive in > on > > > that more. > > > https://docs.spring.io/spring-kafka/reference/html/ > > > > > > Transactions are enabled by providing the DefaultKafkaProducerFactory > > > with a transactionIdPrefix. In that case, instead of managing a single > > > shared Producer, the factory maintains a cache of transactional > > > producers. When the user calls close() on a producer, it is returned to > > > the cache for reuse instead of actually being closed. The > > transactional.id > > > property of each producer is transactionIdPrefix + n, where n starts > with > > > 0 and is incremented for each new producer, unless the transaction is > > > started by a listener container with a record-based listener. In that > > case, > > > the transactional.id is <transactionIdPrefix>.<group.id > > > >.<topic>.<partition>. This is to properly support fencing zombies, as > > > described here <https://www.confluent.io/blog/transactions-apache- > kafka/ > > >. > > > This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and > 2.2.0. > > If > > > you wish to revert to the previous behavior, you can set the > > > producerPerConsumerPartition property on the > DefaultKafkaProducerFactory > > > to false. > > > > > > While transactions are supported with batch listeners, by default, > zombie > > > fencing is not supported because a batch may contain records from > > multiple > > > topics or partitions. However, starting with version 2.3.2, zombie > > fencing > > > is supported if you set the container property subBatchPerPartition to > > > true. In that case, the batch listener is invoked once per partition > > > received from the last poll, as if each poll only returned records for > a > > > single partition. > > > > > > How to fix this? Here be the dragons: > > > > > > "Practically, one would either have to store the mapping between input > > > partitions and transactional.ids in an external store, or have some > > static > > > encoding of it. Kafka Streams opts for the latter approach to solve > this > > > problem." > > > > > > Well then practically, the rest of this code is not useful. > > > > > > while (true) { > > > ConsumerRecords records = consumer.poll(Long.MAX_VALUE); > > > producer.beginTransaction(); > > > for (ConsumerRecord record : records) > > > producer.send(producerRecord(“outputTopic”, record)); > > > producer.sendOffsetsToTransaction(currentOffsets(consumer), group); > > > producer.commitTransaction(); > > > } > > > > > > When you call consumer.poll() (unless you manually assign to a single > > > partiion) you are going to receive records from one or more partitions. > > As > > > consumers join and leave the the group workers are going to move. > > > > > > There are two ways I can find to do this: > > > 1) use a consumerRebalanceListener and a concurrent map of producers > > > https://github.com/edwardcapriolo/kafos/blob/ > > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123 > > > > > > 2) as you are polling create/reuse producers as needed. > > > > > > With scalable correctly fenced producers the loop looks more like this: > > > > > > ConsumerRecords<K,V> records = null; > > > try { > > > records = consumer.poll(2000); > > > } catch (KafkaException e){ > > > return; > > > } > > > > > > for(ConsumerRecord<K,V> record: records) { > > > int partition = record.partition(); > > > BufferedTrackingProducer<K,V> producer = producers.get(partition); > > > List<ProducerRecord<K, V>> results = processor.process(record); > > > producer.send(results); > > > > > > } > > > for (Entry<Integer, BufferedTrackingProducer<K, V>> entry: > > producers.entrySet()) > > > { > > > entry.getValue().commitAndClear(records, groupId); > > > } > > > > > > Implementation here: https://github.com/edwardcapriolo/kafos/blob/ > > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1 > > > > > > BTW maybe I am wrong about this and there is a better approach. With > the > > > multiple producer threads for single consumer.poll figuring out exactly > > how > > > to layer the try/catch and exception handling becomes a bit harder than > > the > > > example in the blog. > > > > > > Anyway, here is what a Kafos chaos "test" looks like. > > > > > > https://github.com/edwardcapriolo/kafos/blob/ > > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/ > > > java/pure8/multipartitionrpw/MutliRPWWithConsumerRestarting > Test.java#L1 > > > > > > It is a bit silly that we are forking off processes like this, but I > did > > > not want processes running in the same VM.I am not ready to run > > > testcontainers/docker at this point. But hey we launch kafka, we launch > > zk, > > > we start workers, we kill them off we count things. Results look like > > > this. > > > > > > > > > Verification stats > > > ------------------------------------------------------------------ > > > notFound: 0 foundCorrect: 4000 foundDuplications: 0 > > > ------------------------------------------------------------------ > > > not found list[] > > > serverid: 0 out line Stream closed > > > > > > I am going to do more work killing off servers and making more test > > > scenarios, but so far off to a good start. When you get all the code > > right, > > > (by cobbling together all the sources out there with half the code) you > > get > > > things that seem to do exactly once, Help wanted in acedemic review of > my > > > assertions and code or PRS > > > > > > Thanks > > > Edward > > > > > > > > > -- > > Sorry this was sent from mobile. Will do less grammar and spell check > than > > usual. > > > Right. It looks like what you need to do. In my scenario above you have n transactional producers. You loop thought them calling commit. If any of them fail. Do not call poll() close all the producers. Close the consumer. Now restart it..ef fectively you create a rebalance. Can you psuedo code your method of use a combination of the KafkaConsumer#committed() and KafkaConsumer#seek() I like this approach because avoiding the rebalance might be a good idea. -- Sorry this was sent from mobile. Will do less grammar and spell check than usual.