On Wednesday, November 20, 2019, Eric Azama <eazama...@gmail.com> wrote:

> Calls to KafkaConsumer#poll() are completely independent of commits. As
> such they will always return the next set of records, even if the previous
> set have not been committed. This is how the consumer acts, regardless of
> the Exactly Once semantics.
>
> In order for the Consumer to reset to the currently committed offsets, you
> need to either initiate a Consumer Group Rebalance, or use a combination of
> the KafkaConsumer#committed() and KafkaConsumer#seek() methods.
>
> On Wed, Nov 20, 2019 at 6:16 AM Edward Capriolo <edlinuxg...@gmail.com>
> wrote:
>
> > Ok. I'm at a point where I believe the exactly once is in question.
> >
> > Topic input 10 partitions topic output 10 partitions.
> >
> > Producer writes messages 1 to 100 to topic input.
> >
> > CTP process calls poll. It receives 100 messages 10 in each partiton.
> >
> > Process is simple mirroring take from input write to output.
> >
> > 10 producers with 10 transactional ids are created. During processing 1
> of
> > the 10 producers throws kafka exception. 90 out of 100 writes are
> committed
> > tranactionally 10 are not.
> >
> > If poll is called again 10 messages do not appear in the next poll. Are
> > they lost?
> >
> >
> >
> > On Saturday, November 9, 2019, Edward Capriolo <edlinuxg...@gmail.com>
> > wrote:
> >
> > >
> > > On Thu, Oct 31, 2019 at 6:11 AM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > >> Quite a project to test transactions...
> > >>
> > >> The current system test suite is part of the code base:
> > >> https://github.com/apache/kafka/tree/trunk/tests/kafkatest/tests
> > >>
> > >> There is course also some unit/integration test for transactions.
> > >>
> > >> There is also a blog post that describes in a high level what testing
> > >> was done when EOS was introduced:
> > >> https://www.confluent.io/blog/exactly-once-semantics-are-
> > >> possible-heres-how-apache-kafka-does-it/
> > >>
> > >> And yes, transactions are built on some assumptions and if you
> configure
> > >> your system incorrectly of violate those assumptions, it may break. We
> > >> also fixed some bugs since the first release. And there might be more
> > >> bugs --- software is always buggy. However, for practical
> consideration,
> > >> transactions should work.
> > >>
> > >> We would of course love if you could share your test results! If you
> > >> discover a bug, please report it, so we can fix it.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/28/19 10:06 AM, Edward Capriolo wrote:
> > >> > On Sunday, October 27, 2019, Boyang Chen <
> reluctanthero...@gmail.com>
> > >> wrote:
> > >> >
> > >> >> Hey Edward,
> > >> >>
> > >> >> just to summarize and make sure I understood your question, you
> want
> > to
> > >> >> implement some Chaos testing to validate Kafka EOS model, but not
> > sure
> > >> how
> > >> >> to start or curious about whether there are already works in the
> > >> community
> > >> >> doing that?
> > >> >>
> > >> >> For the correctness of Kafka EOS, we have tons of unit tests and
> > system
> > >> >> tests to prove its functionality. They could be found inside the
> > repo.
> > >> You
> > >> >> could check them out and see if we still have gaps (which I believe
> > we
> > >> >> definitely have).
> > >> >>
> > >> >> Boyang
> > >> >>
> > >> >> On Fri, Oct 25, 2019 at 7:25 PM Edward Capriolo <
> > edlinuxg...@gmail.com
> > >> >
> > >> >> wrote:
> > >> >>
> > >> >>> Hello all,
> > >> >>>
> > >> >>> I used to work in adtech. Adtech was great. CPM for ads is 1-$5
> per
> > >> >>> thousand ad impression. If numbers are 5% off you can blame
> > javascript
> > >> >>> click trackers.
> > >> >>>
> > >> >>> Now, I work in a non addtech industry and they are really, really
> > >> serious
> > >> >>> about exactly once.
> > >> >>>
> > >> >>> So there is this blog:
> > >> >>>
> > >> >>> https://www.confluent.io/blog/transactions-apache-kafka/
> > >> >>>
> > >> >>> Great little snippet of code. I think I can copy it and implement
> it
> > >> >>> correctly.
> > >> >>> You know, but if you read that section about the zombie fencing,
> you
> > >> >> learn
> > >> >>> that you either need to manually assign partitions or use the
> > >> rebalance
> > >> >>> listener and have N producers. Hunting around github is not super
> > >> >> helpful,
> > >> >>> some code snippets are less complete then even the snipped in the
> > >> blog.
> > >> >>>
> > >> >>> I looked at what spring-kafka does. It does get the zombie fencing
> > >> >> correct
> > >> >>> with respect to fencing id, other bits other bits of the code seem
> > >> >>> plausible.
> > >> >>>
> > >> >>> Notice I said "plausible", because I do not count a few end to end
> > >> tests
> > >> >>> running single VM as a solid enough evidence that this works in
> the
> > >> face
> > >> >> of
> > >> >>> failures.
> > >> >>>
> > >> >>> I have been contemplating how one stress tests this exactly once
> > >> concept,
> > >> >>> with something Jepsen like or something brute force that I can run
> > >> for 5
> > >> >>> hours in a row
> > >> >>>
> > >> >>> If I faithfully implemented the code in the transactional
> read-write
> > >> loop
> > >> >>> and I feed it into my jepsen like black box tester it should:
> > >> >>>
> > >> >>> Create a topic with 10 partitions, Start launching read-write
> > >> transaction
> > >> >>> code, start feeding input data, Maybe strings like 1 -1000, now
> > start
> > >> >>> randomly killing vms with kill -9 kill graceful exits, maybe even
> > >> killing
> > >> >>> kafka, and make sure 1-1000, pop out on the other end.
> > >> >>>
> > >> >>> I thought of some other "crazy" ideas. One such idea:
> > >> >>>
> > >> >>> If I make a transactional "echo", read x; write x back to the same
> > >> topic.
> > >> >>> RunN instances of that and kill them randomly. If I am loosing
> > >> messages
> > >> >>> (and not duplicating messages) then the topic would eventually
> have
> > no
> > >> >>> data..
> > >> >>>
> > >> >>> Or should I make a program with some math formula like receive x
> > write
> > >> >> xx.
> > >> >>> If duplication is happening I would start seeing multiple xx's
> > >> >>>
> > >> >>> Or send 1,000,000,000 messages through and consumer logs them to a
> > >> file.
> > >> >>> Then use an etl tool to validate messages come out on the other
> > side.
> > >> >>>
> > >> >>> Or should I use a nosql with increments and count up and ensure no
> > key
> > >> >> has
> > >> >>> been incremented twice.
> > >> >>>
> > >> >>> note: I realize I can just use kafka streams or storm, which has
> its
> > >> own
> > >> >>> systems to guarantee "at most once" but Iooking for a way to prove
> > >> what
> > >> >> can
> > >> >>> be done with pure kafka. (and not just prove it adtech work (good
> > >> enough
> > >> >> 5%
> > >> >>> here or there) )
> > >> >>>
> > >> >>> I imagine someone somewhere must be doing this. How where tips? Is
> > it
> > >> >> part
> > >> >>> of some kafka release stress test? I'm down to write it if it does
> > not
> > >> >>> exist.
> > >> >>>
> > >> >>> Thanks,
> > >> >>> Edward,
> > >> >>>
> > >> >>> Thanks,
> > >> >>> Edward
> > >> >>>
> > >> >>
> > >> >
> > >> > Boyang,
> > >> >
> > >> > I just to summarize and make sure I understood your question, you
> want
> > >> to
> > >> > implement some Chaos testing to validate Kafka EOS model, but not
> sure
> > >> how
> > >> > to start or curious about whether there are already works in the
> > >> community
> > >> > doing that?
> > >> >
> > >> > Yes.
> > >> >
> > >> > I am not an expert in this field, but I know what distributed
> systems
> > >> can
> > >> > mask failures. For example if you have atomic increment you might
> unit
> > >> test
> > >> > it and it works fine, but if you ran it for 40 days it might double
> > >> count 1
> > >> > time.
> > >> >
> > >> >  of Kafka EOS, we have tons of unit tests and system
> > >> > tests to prove its functionality. They could be found inside the
> repo.
> > >> >
> > >> > I've been a developer for a while so the phrase "there are tests"
> > never
> > >> > tells me everything. Tests reveal the presence of bugs not the
> > absence.
> > >> >
> > >> > Can you please point me at the tests? My curiosity is if there is a
> > >> > systematic in-depth strategy here and how much rigor there is.
> > >> >
> > >> > In my environment I need to quantify and use rigor to prove out
> these
> > >> > things. Things that you might take for granted. For example, I have
> to
> > >> > prove that zookeeper works as expected when we lose a datacenter.
> Most
> > >> > people 'in the know' take it for granted that kafka and zk do what
> is
> > >> > advertised when configured properly. I have to test that out and
> > >> document
> > >> > my findings.
> > >> >
> > >> > For kafka transactions. The user space code needs to be written
> > properly
> > >> > and configured properly along with the server being setup properly.
> It
> > >> is
> > >> > not enough for me to check out kafka run 'sbt test' and declare
> > victory
> > >> > after the unit tests pass.
> > >> >
> > >> > What I am effectively looking for is the anti jepsen blog that
> > says...We
> > >> > threw the kitchen sink at this and these transactions are bullet
> > proof.
> > >> > Here is our methodology, here is some charts, here is xyz. Here is
> how
> > >> we
> > >> > run it every minor release
> > >> >
> > >> > I'm not trying to be a pita, educate me on how bullet proof this is
> > and
> > >> how
> > >> > I can reproduce the results.
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >>
> > >>
> > >
> > > All,
> > >
> > > After a few weeks of hacking at this I have found a few interesting
> > > things. First things first, introducing Kafos
> > >
> > > https://github.com/edwardcapriolo/kafos
> > >
> > > Kafos (think Kafka + Chaos) . Kafos is a project that attempts to prove
> > > out Kafka transactions do exacty-once by launching multiple servers and
> > > consume/producers, killing them off and counting how many messages land
> > on
> > > the other end.
> > >
> > > Note: The project is a little raw at this point, (it has hard code to
> my
> > > home folders and some long Thread.sleep() calls in there. I will clean
> it
> > > up as I go,) but I have some early findings.
> > >
> > > From the blog here, I want to point out the following flaws:
> > >
> > > https://www.confluent.io/blog/transactions-apache-kafka/
> > >
> > > KafkaProducer producer = createKafkaProducer(
> > > “bootstrap.servers”, “localhost:9092”,
> > > “transactional.id”, “my-transactional-id”);
> > >
> > > 1) a fixed transaction.id effectively means the first worker fences
> all
> > > the other ones.While being exactly-once, there is exactly one worker
> and
> > no
> > > parallelism.
> > >
> > > Spring has two schemes that seem to implement random information in the
> > > fencing ids, which I think defeats the purpose. But I need to dive in
> on
> > > that more.
> > > https://docs.spring.io/spring-kafka/reference/html/
> > >
> > > Transactions are enabled by providing the DefaultKafkaProducerFactory
> > > with a transactionIdPrefix. In that case, instead of managing a single
> > > shared Producer, the factory maintains a cache of transactional
> > > producers. When the user calls close() on a producer, it is returned to
> > > the cache for reuse instead of actually being closed. The
> > transactional.id
> > > property of each producer is transactionIdPrefix + n, where n starts
> with
> > > 0 and is incremented for each new producer, unless the transaction is
> > > started by a listener container with a record-based listener. In that
> > case,
> > > the transactional.id is <transactionIdPrefix>.<group.id
> > > >.<topic>.<partition>. This is to properly support fencing zombies, as
> > > described here <https://www.confluent.io/blog/transactions-apache-
> kafka/
> > >.
> > > This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and
> 2.2.0.
> > If
> > > you wish to revert to the previous behavior, you can set the
> > > producerPerConsumerPartition property on the
> DefaultKafkaProducerFactory
> > > to false.
> > >
> > > While transactions are supported with batch listeners, by default,
> zombie
> > > fencing is not supported because a batch may contain records from
> > multiple
> > > topics or partitions. However, starting with version 2.3.2, zombie
> > fencing
> > > is supported if you set the container property subBatchPerPartition to
> > > true. In that case, the batch listener is invoked once per partition
> > > received from the last poll, as if each poll only returned records for
> a
> > > single partition.
> > >
> > > How to fix this? Here be the dragons:
> > >
> > > "Practically, one would either have to store the mapping between input
> > > partitions and transactional.ids in an external store, or have some
> > static
> > > encoding of it. Kafka Streams opts for the latter approach to solve
> this
> > > problem."
> > >
> > > Well then practically, the rest of this code is not useful.
> > >
> > > while (true) {
> > >    ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
> > >    producer.beginTransaction();
> > >    for (ConsumerRecord record : records)
> > >     producer.send(producerRecord(“outputTopic”, record));
> > >    producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
> > >    producer.commitTransaction();
> > > }
> > >
> > > When you call consumer.poll() (unless you manually assign to a single
> > > partiion) you are going to receive records from one or more partitions.
> > As
> > > consumers join and leave the the group workers are going to move.
> > >
> > > There are two ways I can find to do this:
> > > 1) use a consumerRebalanceListener and a concurrent map of producers
> > > https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L123
> > >
> > > 2) as you are polling create/reuse producers as needed.
> > >
> > > With scalable correctly fenced producers the loop looks more like this:
> > >
> > > ConsumerRecords<K,V> records = null;
> > > try {
> > >    records = consumer.poll(2000);
> > > } catch (KafkaException e){
> > >    return;
> > > }
> > >
> > > for(ConsumerRecord<K,V> record: records) {
> > >    int partition = record.partition();
> > >    BufferedTrackingProducer<K,V> producer = producers.get(partition);
> > >    List<ProducerRecord<K, V>> results = processor.process(record);
> > >    producer.send(results);
> > >
> > > }
> > > for (Entry<Integer, BufferedTrackingProducer<K, V>> entry:
> > producers.entrySet())
> > > {
> > >    entry.getValue().commitAndClear(records, groupId);
> > > }
> > >
> > > Implementation here: https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MultiPartitionRPWBuffered.java#L1
> > >
> > > BTW maybe I am wrong about this and there is a better approach. With
> the
> > > multiple producer threads for single consumer.poll figuring out exactly
> > how
> > > to layer the try/catch and exception handling becomes a bit harder than
> > the
> > > example in the blog.
> > >
> > > Anyway,  here is what a Kafos chaos "test" looks like.
> > >
> > > https://github.com/edwardcapriolo/kafos/blob/
> > > 68782a5420840377dcb3ef94fb65132028804340/pure8/src/main/
> > > java/pure8/multipartitionrpw/MutliRPWWithConsumerRestarting
> Test.java#L1
> > >
> > > It is a bit silly that we are forking off processes like this, but I
> did
> > > not want processes running in the same VM.I am not ready to run
> > > testcontainers/docker at this point. But hey we launch kafka, we launch
> > zk,
> > > we start  workers, we kill them off we count things. Results look like
> > > this.
> > >
> > >
> > > Verification stats
> > > ------------------------------------------------------------------
> > > notFound: 0 foundCorrect: 4000 foundDuplications: 0
> > > ------------------------------------------------------------------
> > > not found list[]
> > > serverid: 0 out line Stream closed
> > >
> > > I am going to do more work killing off servers and making more test
> > > scenarios, but so far off to a good start. When you get all the code
> > right,
> > > (by cobbling together all the sources out there with half the code) you
> > get
> > > things that seem to do exactly once, Help wanted in acedemic review of
> my
> > > assertions and code or PRS
> > >
> > > Thanks
> > > Edward
> > >
> >
> >
> > --
> > Sorry this was sent from mobile. Will do less grammar and spell check
> than
> > usual.
> >
>

Right. It looks like what you need to do.

In my scenario above you have n transactional producers. You loop thought
them calling commit. If any of them fail. Do not call poll() close all the
producers. Close the consumer. Now restart it..ef fectively you create a
rebalance.

Can you psuedo code your method of

use a combination of
the KafkaConsumer#committed() and KafkaConsumer#seek()

I like this approach because avoiding the rebalance might be a good idea.


-- 
Sorry this was sent from mobile. Will do less grammar and spell check than
usual.

Reply via email to