On Wednesday, November 20, 2019, Matthias J. Sax <matth...@confluent.io>
wrote:

> I am not sure what Spring does, but using Kafka Streams writing the
> output and committing offset would be part of the same transaction.
>
> It seems Spring is doing something else and thus, is seems it does not
> use the EOS API correctly.
>
> If you use transactions to copy data from input to output topic,
> committing offsets must be done on the producer as part to the
> transaction; the consumer would not commit offsets.
>
> To me, it seems that Spring is committing offset using the consumer
> independent if the transaction was successful or not. This would be an
> incorrect usage of the API.
>
>
> -Matthias
>
> On 11/20/19 6:16 AM, Edward Capriolo wrote:
> > Ok. I'm at a point where I believe the exactly once is in question.
> >
> > Topic input 10 partitions topic output 10 partitions.
> >
> > Producer writes messages 1 to 100 to topic input.
> >
> > CTP process calls poll. It receives 100 messages 10 in each partiton.
> >
> > Process is simple mirroring take from input write to output.
> >
> > 10 producers with 10 transactional ids are created. During processing 1
> of
> > the 10 producers throws kafka exception. 90 out of 100 writes are
> committed
> > tranactionally 10 are not.
> >
> > If poll is called again 10 messages do not appear in the next poll. Are
> > they lost?
> >
> >
> >
> > On Saturday, November 9, 2019, Edward Capriolo <edlinuxg...@gmail.com>
> > wrote:
> >
> >>
> >> On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matth...@confluent.io>
> >> wrote:
> >>
> >>> Quite a project to test transactions...
> >>>
> >>> The current system test suite is part of the code base:
> >>> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
> >>>
> >>> There is course also some unit/integration test for transactions.
> >>>
> >>> There is also a blog post that describes in a high level what testing
> >>> was done when EOS was introduced:
> >>> https://www.confluent.io/blog/exactly-once-semantics-are-
> >>> possible-heres-how-apache-kafka-does-it/
> >>>
> >>> And yes, transactions are built on some assumptions and if you
> configure
> >>> your system incorrectly of violate those assumptions, it may break. We
> >>> also fixed some bugs since the first release. And there might be more
> >>> bugs --- software is always buggy. However, for practical
> consideration,
> >>> transactions should work.
> >>>
> >>> We would of course love if you could share your test results! If you
> >>> discover a bug, please report it, so we can fix it.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> >>>> On Sunday, October 27, 2019, Boyang Chen <reluctanthero...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hey Edward,
> >>>>>
> >>>>> just to summarize and make sure I understood your question, you want
> to
> >>>>> implement some Chaos testing to validate Kafka EOS model, but not
> sure
> >>> how
> >>>>> to start or curious about whether there are already works in the
> >>> community
> >>>>> doing that?
> >>>>>
> >>>>> For the correctness of Kafka EOS, we have tons of unit tests and
> system
> >>>>> tests to prove its functionality. They could be found inside the
> repo.
> >>> You
> >>>>> could check them out and see if we still have gaps (which I believe
> we
> >>>>> definitely have).
> >>>>>
> >>>>> Boyang
> >>>>>
> >>>>> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
> edlinuxg...@gmail.com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hello all,
> >>>>>>
> >>>>>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5 per
> >>>>>> thousand ad impression. If numbers are 5% off you can blame
> javascript
> >>>>>> click trackers.
> >>>>>>
> >>>>>> Now, I work in a non addtech industry and they are really, really
> >>> serious
> >>>>>> about exactly once.
> >>>>>>
> >>>>>> So there is this blog:
> >>>>>>
> >>>>>> https://www.confluent.io/blog/transactions-apache-kafka/
> >>>>>>
> >>>>>> Great little snippet of code. I think I can copy it and implement it
> >>>>>> correctly.
> >>>>>> You know, but if you read that section about the zombie fencing, you
> >>>>> learn
> >>>>>> that you either need to manually assign partitions or use the
> >>> rebalance
> >>>>>> listener and have N producers. Hunting around github is not super
> >>>>> helpful,
> >>>>>> some code snippets are less complete then even the snipped in the
> >>> blog.
> >>>>>>
> >>>>>> I looked at what spring-kafka does. It does get the zombie fencing
> >>>>> correct
> >>>>>> with respect to fencing id, other bits other bits of the code seem
> >>>>>> plausible.
> >>>>>>
> >>>>>> Notice I said "plausible", because I do not count a few end to end
> >>> tests
> >>>>>> running single VM as a solid enough evidence that this works in the
> >>> face
> >>>>> of
> >>>>>> failures.
> >>>>>>
> >>>>>> I have been contemplating how one stress tests this exactly once
> >>> concept,
> >>>>>> with something Jepsen like or something brute force that I can run
> >>> for 5
> >>>>>> hours in a row
> >>>>>>
> >>>>>> If I faithfully implemented the code in the transactional read-write
> >>> loop
> >>>>>> and I feed it into my jepsen like black box tester it should:
> >>>>>>
> >>>>>> Create a topic with 10 partitions, Start launching read-write
> >>> transaction
> >>>>>> code, start feeding input data, Maybe strings like 1 -1000, now
> start
> >>>>>> randomly killing vms with kill -9 kill graceful exits, maybe even
> >>> killing
> >>>>>> kafka, and make sure 1-1000, pop out on the other end.
> >>>>>>
> >>>>>> I thought of some other "crazy" ideas. One such idea:
> >>>>>>
> >>>>>> If I make a transactional "echo", read x; write x back to the same
> >>> topic.
> >>>>>> RunN instances of that and kill them randomly. If I am loosing
> >>> messages
> >>>>>> (and not duplicating messages) then the topic would eventually have
> no
> >>>>>> data..
> >>>>>>
> >>>>>> Or should I make a program with some math formula like receive x
> write
> >>>>> xx.
> >>>>>> If duplication is happening I would start seeing multiple xx's
> >>>>>>
> >>>>>> Or send 1,000,000,000 messages through and consumer logs them to a
> >>> file.
> >>>>>> Then use an etl tool to validate messages come out on the other
> side.
> >>>>>>
> >>>>>> Or should I use a nosql with increments and count up and ensure no
> key
> >>>>> has
> >>>>>> been incremented twice.
> >>>>>>
> >>>>>> note: I realize I can just use kafka streams or storm, which has its
> >>> own
> >>>>>> systems to guarantee "at most once" but Iooking for a way to prove
> >>> what
> >>>>> can
> >>>>>> be done with pure kafka. (and not just prove it adtech work (good
> >>> enough
> >>>>> 5%
> >>>>>> here or there) )
> >>>>>>
> >>>>>> I imagine someone somewhere must be doing this. How where tips? Is
> it
> >>>>> part
> >>>>>> of some kafka release stress test? I'm down to write it if it does
> not
> >>>>>> exist.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Edward,
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Edward
> >>>>>>
> >>>>>
> >>>>
> >>>> Boyang,
> >>>>
> >>>> I just to summarize and make sure I understood your question, you want
> >>> to
> >>>> implement some Chaos testing to validate Kafka EOS model, but not sure
> >>> how
> >>>> to start or curious about whether there are already works in the
> >>> community
> >>>> doing that?
> >>>>
> >>>> Yes.
> >>>>
> >>>> I am not an expert in this field, but I know what distributed systems
> >>> can
> >>>> mask failures. For example if you have atomic increment you might unit
> >>> test
> >>>> it and it works fine, but if you ran it for 40 days it might double
> >>> count 1
> >>>> time.
> >>>>
> >>>>  of Kafka EOS, we have tons of unit tests and system
> >>>> tests to prove its functionality. They could be found inside the repo.
> >>>>
> >>>> I've been a developer for a while so the phrase "there are tests"
> never
> >>>> tells me everything. Tests reveal the presence of bugs not the
> absence.
> >>>>
> >>>> Can you please point me at the tests? My curiosity is if there is a
> >>>> systematic in-depth strategy here and how much rigor there is.
> >>>>
> >>>> In my environment I need to quantify and use rigor to prove out these
> >>>> things. Things that you might take for granted. For example, I have to
> >>>> prove that zookeeper works as expected when we lose a datacenter. Most
> >>>> people 'in the know' take it for granted that kafka and zk do what is
> >>>> advertised when configured properly. I have to test that out and
> >>> document
> >>>> my findings.
> >>>>
> >>>> For kafka transactions. The user space code needs to be written
> properly
> >>>> and configured properly along with the server being setup properly. It
> >>> is
> >>>> not enough for me to check out kafka run 'sbt test' and declare
> victory
> >>>> after the unit tests pass.
> >>>>
> >>>> What I am effectively looking for is the anti jepsen blog that
> says...We
> >>>> threw the kitchen sink at this and these transactions are bullet
> proof.
> >>>> Here is our methodology, here is some charts, here is xyz. Here is how
> >>> we
> >>>> run it every minor release
> >>>>
> >>>> I'm not trying to be a pita, educate me on how bullet proof this is
> and
> >>> how
> >>>> I can reproduce the results.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >> All,
> >>
> >> After a few weeks of hacking at this I have found a few interesting
> >> things. First things first, introducing Kafos
> >>
> >> https://github.com/edwardcapriolo/kafos
> >>
> >> Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> >> out Kafka transactions do exacty-once by launching multiple servers and
> >> consume/producers, killing them off and counting how many messages land
> on
> >> the other end.
> >>
> >> Note: The project is a little raw at this point, (it has hard code to my
> >> home folders and some long Thread.sleep() calls in there. I will clean
> it
> >> up as I go,) but I have some early findings.
> >>
> >> From the blog here, I want to point out the following flaws:
> >>
> >> https://www.confluent.io/blog/transactions-apache-kafka/
> >>
> >> KafkaProducer producer = createKafkaProducer(
> >> “bootstrap.servers”, “localhost:9092”,
> >> “transactional.id”, “my-transactional-id”);
> >>
> >> 1) a fixed transaction.id effectively means the first worker fences all
> >> the other ones.While being exactly-once, there is exactly one worker
> and no
> >> parallelism.
> >>
> >> Spring has two schemes that seem to implement random information in the
> >> fencing ids, which I think defeats the purpose. But I need to dive in on
> >> that more.
> >> https://docs.spring.io/spring-kafka/reference/html/
> >>
> >> Transactions are enabled by providing the DefaultKafkaProducerFactory
> >> with a transactionIdPrefix. In that case, instead of managing a single
> >> shared Producer, the factory maintains a cache of transactional
> >> producers. When the user calls close() on a producer, it is returned to
> >> the cache for reuse instead of actually being closed. The
> transactional.id
> >> property of each producer is transactionIdPrefix + n, where n starts
> with
> >> 0 and is incremented for each new producer, unless the transaction is
> >> started by a listener container with a record-based listener. In that
> case,
> >> the transactional.id is <transactionIdPrefix>.<group.id
> >>> .<topic>.<partition>. This is to properly support fencing zombies, as
> >> described here <https://www.confluent.io/blog/transactions-apache-
> kafka/>.
> >> This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and
> 2.2.0. If
> >> you wish to revert to the previous behavior, you can set the
> >> producerPerConsumerPartition property on the DefaultKafkaProducerFactory
> >> to false.
> >>
> >> While transactions are supported with batch listeners, by default,
> zombie
> >> fencing is not supported because a batch may contain records from
> multiple
> >> topics or partitions. However, starting with version 2.3.2, zombie
> fencing
> >> is supported if you set the container property subBatchPerPartition to
> >> true. In that case, the batch listener is invoked once per partition
> >> received from the last poll, as if each poll only returned records for a
> >> single partition.
> >>
> >> How to fix this? Here be the dragons:
> >>
> >> "Practically, one would either have to store the mapping between input
> >> partitions and transactional.ids in an external store, or have some
> static
> >> encoding of it. Kafka Streams opts for the latter approach to solve this
> >> problem."
> >>
> >> Well then practically, the rest of this code is not useful.
> >>
> >> while (true) {
> >>    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
> >>    producer.beginTransaction();
> >>    for (ConsumerRecord record : records)
> >>     producer.send(producerRecord(“outputTopic”, record));
> >>    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
> >>    producer.commitTransaction();
> >> }
> >>
> >> When you call consumer.poll() (unless you manually assign to a single
> >> partiion) you are going to receive records from one or more
> partitions.  As
> >> consumers join and leave the the group workers are going to move.
> >>
> >> There are two ways I can find to do this:
> >> 1) use a consumerRebalanceListener and a concurrent map of producers
> >> https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
> >>
> >> 2) as you are polling create/reuse producers as needed.
> >>
> >> With scalable correctly fenced producers the loop looks more like this:
> >>
> >> ConsumerRecords<K,V> records = null;
> >> try {
> >>    records = consumer.poll(2000);
> >> } catch (KafkaException e){
> >>    return;
> >> }
> >>
> >> for(ConsumerRecord<K,V> record: records) {
> >>    int partition = record.partition();
> >>    BufferedTrackingProducer<K,V> producer = producers.get(partition);
> >>    List<ProducerRecord<K, V>> results = processor.process(record);
> >>    producer.send(results);
> >>
> >> }
> >> for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
> producers.entrySet())
> >> {
> >>    entry.getValue().commitAndClear(records, groupId);
> >> }
> >>
> >> Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
> >>
> >> BTW maybe I am wrong about this and there is a better approach. With the
> >> multiple producer threads for single consumer.poll figuring out exactly
> how
> >> to layer the try/catch and exception handling becomes a bit harder than
> the
> >> example in the blog.
> >>
> >> Anyway,  here is what a Kafos chaos "test" looks like.
> >>
> >> https://github.com/edwardcapriolo/kafos/blob/
> >> 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> >> java/pure8/multipartitionrpw/MutliRPWWithConsumerRestartingTest.java#L1
> >>
> >> It is a bit silly that we are forking off processes like this, but I did
> >> not want processes running in the same VM.I am not ready to run
> >> testcontainers/docker at this point. But hey we launch kafka, we launch
> zk,
> >> we start  workers, we kill them off we count things. Results look like
> >> this.
> >>
> >>
> >> Verification stats
> >> ------------------------------------------------------------------
> >> notFound: 0 foundCorrect: 4000 foundDuplications: 0
> >> ------------------------------------------------------------------
> >> not found list[]
> >> serverid: 0 out line Stream closed
> >>
> >> I am going to do more work killing off servers and making more test
> >> scenarios, but so far off to a good start. When you get all the code
> right,
> >> (by cobbling together all the sources out there with half the code) you
> get
> >> things that seem to do exactly once, Help wanted in acedemic review of
> my
> >> assertions and code or PRS
> >>
> >> Thanks
> >> Edward
> >>
> >
> >
>
>
Kafka will throw you a warning if you try to mix consumer commit with
transaction commits.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.

Reply via email to