Re: Apache zookeeper going down every 168 hours

2024-05-04 Thread Haruki Okada
I mean, just applying the PR linked to the ticket (
https://github.com/apache/kafka/pull/6517) locally. Or just cloning the
branch of this PR.
Note that the patch is not merged to upstream yet so you should try it at
your own risk.


Thanks,

2024年5月4日(土) 15:52 Yogeshkumar Annadurai :

> Hello,
>
> Thanks for your prompt response.
>
> How to apply patch for this? Could you please provide further more details?
>
> Regards
> Yogeshkumar A
>
> On Sat, 4 May 2024 at 9:33 AM, Haruki Okada  wrote:
>
> > Thanks for sharing logs.
> >
> > Kafka has a mechanism to mark log dir as "failed" when IOException
> happens
> > on I/O operations, and it will shut down when all log dirs have marked as
> > failed. (Kafka allows to set multiple log dirs for JBOD)
> > From server.log, we can see that the Kafka broker shut down because of
> > this.
> >
> > `[2024-04-28 11:50:46,466] ERROR Shutdown broker because all log dirs in
> > C:\kafka\data\logs have failed (kafka.log.LogManager)`
> >
> > And, seems the IOException caused this situation shows below message:
> > `The process cannot access the file because it is being used by another
> > process.`
> >
> > I can find some similar issues on windows on Kafka JIRA (e.g.
> > https://issues.apache.org/jira/browse/KAFKA-8172)
> > I never run Kafka on windows so I'm not sure if it works though, you may
> > try patch on this ticket.
> >
> > By the way, running Kafka on windows might be challenging (especially on
> > Production environment) so I recommend you to try on Linux (or on WSL at
> > least)
> >
> >
> > Thanks,
> >
> > 2024年5月4日(土) 10:20 Yogeshkumar Annadurai  >:
> >
> > > Hello,
> > >
> > > We see timeout error in server.log
> > > log files and properties files are attached for your reference
> > >
> > > regards
> > > Yogeshkumar A
> > >
> > > On Sat, May 4, 2024 at 5:27 AM Haruki Okada 
> wrote:
> > >
> > >> Hi.
> > >>
> > >> log.retention shouldn't be related to the phenomenon.
> > >> Sounds like we should understand the situation more precisely to
> answer.
> > >>
> > >> > apache zookeeper connection is going down automatically
> > >>
> > >> How did you confirm this? On ZooKeeper log?
> > >>
> > >> Also, did you see any logs on Kafka side? (on stdout or server.log,
> ...
> > >> etc)
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> 2024年5月4日(土) 6:48 Yogeshkumar Annadurai <
> yogeshkumarannadu...@gmail.com
> > >:
> > >>
> > >> > Hello,
> > >> >
> > >> > We are using Apache kakfa in a development environment, where apache
> > >> > zookeeper connection is going down automatically every 168 hours. we
> > >> > observed that, log.retention.hours is set as 168 hours (7 days).
> > >> >
> > >> > I would like to understand the configuration for this kind of
> scenario
> > >> > (automatic kafka server down - It says, broker connection cannot be
> > >> > established)
> > >> >
> > >> >
> > >> > Regards
> > >> > Yogeshkumar A
> > >> >
> > >>
> > >>
> > >> --
> > >> 
> > >> Okada Haruki
> > >> ocadar...@gmail.com
> > >> 
> > >>
> > >
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Apache zookeeper going down every 168 hours

2024-05-03 Thread Haruki Okada
Thanks for sharing logs.

Kafka has a mechanism to mark log dir as "failed" when IOException happens
on I/O operations, and it will shut down when all log dirs have marked as
failed. (Kafka allows to set multiple log dirs for JBOD)
>From server.log, we can see that the Kafka broker shut down because of this.

`[2024-04-28 11:50:46,466] ERROR Shutdown broker because all log dirs in
C:\kafka\data\logs have failed (kafka.log.LogManager)`

And, seems the IOException caused this situation shows below message:
`The process cannot access the file because it is being used by another
process.`

I can find some similar issues on windows on Kafka JIRA (e.g.
https://issues.apache.org/jira/browse/KAFKA-8172)
I never run Kafka on windows so I'm not sure if it works though, you may
try patch on this ticket.

By the way, running Kafka on windows might be challenging (especially on
Production environment) so I recommend you to try on Linux (or on WSL at
least)


Thanks,

2024年5月4日(土) 10:20 Yogeshkumar Annadurai :

> Hello,
>
> We see timeout error in server.log
> log files and properties files are attached for your reference
>
> regards
> Yogeshkumar A
>
> On Sat, May 4, 2024 at 5:27 AM Haruki Okada  wrote:
>
>> Hi.
>>
>> log.retention shouldn't be related to the phenomenon.
>> Sounds like we should understand the situation more precisely to answer.
>>
>> > apache zookeeper connection is going down automatically
>>
>> How did you confirm this? On ZooKeeper log?
>>
>> Also, did you see any logs on Kafka side? (on stdout or server.log, ...
>> etc)
>>
>>
>> Thanks,
>>
>> 2024年5月4日(土) 6:48 Yogeshkumar Annadurai :
>>
>> > Hello,
>> >
>> > We are using Apache kakfa in a development environment, where apache
>> > zookeeper connection is going down automatically every 168 hours. we
>> > observed that, log.retention.hours is set as 168 hours (7 days).
>> >
>> > I would like to understand the configuration for this kind of scenario
>> > (automatic kafka server down - It says, broker connection cannot be
>> > established)
>> >
>> >
>> > Regards
>> > Yogeshkumar A
>> >
>>
>>
>> --
>> 
>> Okada Haruki
>> ocadar...@gmail.com
>> 
>>
>

-- 

Okada Haruki
ocadar...@gmail.com



Re: Apache zookeeper going down every 168 hours

2024-05-03 Thread Haruki Okada
Hi.

log.retention shouldn't be related to the phenomenon.
Sounds like we should understand the situation more precisely to answer.

> apache zookeeper connection is going down automatically

How did you confirm this? On ZooKeeper log?

Also, did you see any logs on Kafka side? (on stdout or server.log, ... etc)


Thanks,

2024年5月4日(土) 6:48 Yogeshkumar Annadurai :

> Hello,
>
> We are using Apache kakfa in a development environment, where apache
> zookeeper connection is going down automatically every 168 hours. we
> observed that, log.retention.hours is set as 168 hours (7 days).
>
> I would like to understand the configuration for this kind of scenario
> (automatic kafka server down - It says, broker connection cannot be
> established)
>
>
> Regards
> Yogeshkumar A
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-14 Thread Haruki Okada
Hi.

> By setting max.in.flight.requests.per.connection to 1, I'm concerned that
this could become a performance bottleneck

As Greg pointed out, this is a trade-off between the ordering-guarantee and
the throughput.
So you should first measure the throughput of
max.in.flight.requests.per.connection=1 with tuning batching config
(batch.size, linger.ms) in your environment and see if it becomes the
bottleneck or not.


2024年3月13日(水) 18:31 William Lee :

> Hi Richard,
> Thanks for replying.
>
> > but I close the KafkaProducer inside the send
> > callback.
> > ...
> >  Combined with idempotence enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
>
> Yes, I also find that closing the KafkaProducer inside the send callback
> can prevent more extra records from being sent. But after some
> investigation into the source code of KafkaProducer and Sender, I think
> closing kafka producer in callback is not 100% reliable in such cases. For
> example, If you set max.in.flight.requests.per.connection to 5, and you
> sent 5 batches 1, 2, 3, 4, 5, say batch No.2 failed will exception in the
> callback and you initiated kafka producer closing inside callback, but
> batch No.3 might already in flight which still might be sent to the broker.
> Even though I haven't observed such results during my experiments, I am
> still not sure this is reliable since kafka's official documentation has no
> guarantee about this behaviour.
>
> In the source code of KafkaProducer and Sender, only when
> max.in.flight.requests.per.connection set to 1 will the
> "guaranteeMessageOrder" property set to true thus ensuring only one request
> will be in flight per partition.
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L128
> >
>
> kafka/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> at master · a0x8o/kafka
> <
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L538
> >
>
> Do you have any thoughts?
>
> Thanks and regards,
> William Lee
>
> Richard Bosch  于2024年3月13日周三 16:38写道:
>
> > Hi WIlliam,
> >
> > I see from your example that you close the kafka producer in the send
> > loop, based on the content of sendException that is used in the callback
> of
> > the KafkaProducer send.
> > Since your send loop is a different thread than the KafkaProducer uses to
> > send you will encounter race conditions on this close logic.
> >
> > I actually had a similar requirement as yours and solved it by using a
> > sendException like you do, but I close the KafkaProducer inside the send
> > callback. The send callback is executed as part of the produce thread,
> and
> > closing the consumer there will stop all subsequent batches of
> processing,
> > as the current batch isn't finished yet. Combined with idempotence
> enabled
> > and max inflight set to 5 (the maximum for idempotence tracking) it gave
> me
> > relatively good performance.
> >
> > Kind regards,
> >
> >
> > Richard Bosch
> >
> > Developer Advocate
> >
> > Axual BV
> >
> > https://axual.com/
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Does Kafka wait for an fsync to send back and ACK for a published message ?

2024-03-14 Thread Haruki Okada
Hi.

By default, Kafka returns ack without waiting fsync to the disk. But you
can change this behavior by log.flush.interval.messages config.
For data durability, Kafka mainly relies on replication instead.

> then there is potential for message loss if the node crashes before

On the crashed node, that's true. However, as long as you configure
replicas to span multiple AZ, the data-loss possibility would be very rare
because simultaneous multi-AZ power-failure is unlikely to happen.
FYI Jack Vanlightly wrote nice article about this topic:
https://jack-vanlightly.com/blog/2023/4/24/why-apache-kafka-doesnt-need-fsync-to-be-safe

> But the upside is reduced latency as writes to the pagecache

True. That's why Kafka is performant even on HDDs.
Also, relying on page-cache is a good compromise between latency and
durability because it's still robust against application crash (e.g. by JVM
crash).

2024年3月14日(木) 21:37 Sreyan Chakravarty :

> I am trying to understand when does Kafka signal to the producer that the
> message was successfully accepted into Kafka.
>
> Does Kafka:
>
> 1) Write to the pagecache of the node's OS and then return back an ACK ?
>  If so, then there is potential for message loss if the node crashes before
> fsync to disk. But the upside is reduced latency as writes to the pagecache
> are very fast compared to a fsync to disk.
>
> 2) Wait for an fsync to happen on each message ?
> If so, then there is increased latency but guarantees each message is
> written to disk
>
> 3) Or is this a purely configurable option between the two ?
>
> --
> Regards,
> Sreyan Chakravarty
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka Producer avoid sending more records when met exception while also keeping performance

2024-03-11 Thread Haruki Okada
Hi.

> I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent

I still don't get why you need this behavior though, as long as you set
max.in.flight.requests.per.connection to greater than 1, it's impossible to
avoid this because KafkaProducer can do nothing about requests that are
already sent out.

By the way, with appropriate batch.size and linger.ms configuration, you
can achieve high throughput even with
max.in.flight.requests.per.connection=1 which wouldn't be a problem unless
you have to send large data over slow network.

2024年3月11日(月) 22:55 William Lee :

> Hi all,
> I am facing a problem when I detect an exception in kafka producer
> callback, I immediately stop sending more new records and stop the kafka
> producer, but some extra records were still sent.
>
> I found a way to resolve this issue: setting
> max.in.flight.requests.per.connection to 1 and closing kafka producer when
> encountering an exception in kafka producer callback.
> set max.in.flight.requests.per.connection to 1 will make sure only one
> request will be inflight for one partition, and closing kafka producer in
> producer callback will make Sender in "forceClose" state thus avoiding
> sending extra records.
>
> But, as far as I know, setting max.in.flight.requests.per.connection to 1
> will decrease the performance of kafka producer. I would like to know, is
> there any other way to work around this issue without setting
> max.in.flight.requests.per.connection to 1 so that I can ensure performance
> of kafka producer?
>
> here is my demo source code, you can also find it on Github Gist:
> https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576
>
> public class KafkaProducerProblemDemo {
>
> public static void main(String[] args) {
> Logger rootLogger = (Logger)
> LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
> rootLogger.setLevel(Level.INFO);
>
> String topicName = "test_topic_202403112035";
> Map kafkaTopicConfigs = new HashMap<>();
> Properties props = new Properties();
> props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
> props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> CreateTopicsResult createTopicsResult;
> try (AdminClient adminClient = AdminClient.create(props)) {
> NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
> newTopic.configs(kafkaTopicConfigs);
> kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
> "1048576");
> kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
> "8640");
> createTopicsResult =
> adminClient.createTopics(Collections.singletonList(newTopic));
> System.out.println(createTopicsResult.all().get());
> } catch (Exception e) {
> rootLogger.error("create topic error", e);
> }
>
> // adjust requestTimeout to ensure the request timeout is enough
> long requestTimeout = 2000;
> Properties kafkaProps = new Properties();
> kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.StringSerializer.class.getName());
> kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
> kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
> 192.168.223.3:9094");
> kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
> String.valueOf(requestTimeout));
> kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
> // force one batch per record
> kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
> kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>
> kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>
> try (KafkaProducer kafkaProducer = new
> KafkaProducer<>(kafkaProps)) {
> AtomicBoolean isFirstRecord = new AtomicBoolean(true);
> AtomicReference sendException = new
> AtomicReference<>();
>
> for (int i = 0; i < 2048; i++) {
> String content = String.valueOf(i);
> ProducerRecord record = new
> ProducerRecord<>(topicName, content.getBytes());
>
> if (sendException.get() != null) {
> // once found exception in callback, stop sending more
> records
> kafkaProducer.close();
> break;
> }
>
> kafkaProducer.send(record, (RecordMetadata metadata,
> Exception exception) -> {
> if (isFirstRecord.getAndSet(false)) {
> try {
>

Re: During cluster peak, KAFKA NetworkProcessorAvgIdlePercent is lower than 0.2

2024-01-21 Thread Haruki Okada
This is just a conjecture though, one possible reason I can imagine is
traffic increase => request queue full => request latency increase => more
batching on client-side => overall request-count decrease => CPU-usage
decrease.
You should check request-count metric.

At any rate, full-request-queue situation is not desired so you should
figure out the cause and address that for stable cluster operation.

2024年1月22日(月) 11:53 dong yu :

> I have a question: why does the overall CPU of the cluster decrease when
> the KAFKA cluster traffic increases, the request queue is full, and the
> idle rate is low?
>
> Haruki Okada  于2024年1月15日周一 21:56写道:
>
> > You should investigate the cause of request-queue full situation first.
> > Since I guess low network idle ratio is the consequence of that.
> > (Network-threads would block on queueing when request-queue is full)
> >
> > I recommend running async-profiler to take the profile of the broker
> > process if possible (with wallclock mode).
> > Then it will help you to identify the bottleneck which consuming
> > request-handler time.
> >
> > 2024年1月15日(月) 17:58 dong yu :
> >
> > > This is my problem
> > > 1.The request queue is always at 500
> > > 2.There are 130 machines in the cluster, and the network idle rate of
> 30
> > > machines is less than 20.
> > >
> > >
> > > This is my BROKER configuration
> > > num.io.threads=32
> > > num.network.threads=64
> > >
> > > How should I locate the problem? I tried to increase the parameters but
> > the
> > > effect was not obvious.
> > > THKS。
> > >
> >
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: kafka cluster question

2024-01-19 Thread Haruki Okada
Hi.

Which server did you shutdown in testing?
If it was 192.168.20.223, that is natural kafka-consumer-groups script
fails because you passed only 192.168.20.223 to the bootstrap-server arg.

In HA setup, you have to pass multiple brokers (as the comma separated
string) to bootstrap-server so that the client can fetch initial metadata
from other servers even when one fails.

2024年1月20日(土) 0:30 Yavuz Sert :

> Hi all,
>
> I'm trying to do some tests about high availability on kafka v2.8.2
> I have 3 kafka brokers and 3 zookeeper instances.
> when i shutdown one of the kafka service only in one server i got this
> error:
>
> [root@node-223 ~]# /root/kafka_2.12-2.8.2/bin/kafka-consumer-groups.sh
> --bootstrap-server 192.168.20.223:9092 --group app2 --describe
>
> Error: Executing consumer group command failed due to
> org.apache.kafka.common.errors.TimeoutException:
> Call(callName=findCoordinator, deadlineMs=1705677946526, tries=47,
> nextAllowedTryMs=1705677946627) timed out at 1705677946527 after 47
> attempt(s)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException:
> Call(callName=findCoordinator, deadlineMs=1705677946526, tries=47,
> nextAllowedTryMs=1705677946627) timed out at 1705677946527 after 47
> attempt(s)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$describeConsumerGroups$1(ConsumerGroupCommand.scala:550)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeConsumerGroups(ConsumerGroupCommand.scala:549)
>   at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:565)
>   at
>
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:368)
> at
> kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:73)
> at
> kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:60)
> at
> kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> Caused by: org.apache.kafka.common.errors.TimeoutException:
> Call(callName=findCoordinator, deadlineMs=1705677946526, tries=47,
> nextAllowedTryMs=1705677946627) timed out at 1705677946527 after 47
> attempt(s)
> *Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out
> waiting for a node assignment. Call: findCoordinator*
>
> kafka conf (for 1 server)
> broker.id=0
> listeners=PLAINTEXT://0.0.0.0:9092
> advertised.listeners=PLAINTEXT://192.168.20.223:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/root/kafkadir
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=1
> log.segment.bytes=104857600
> log.retention.check.interval.ms=30
> delete.topic.enable=true
> zookeeper.connection.timeout.ms=18000
> zookeeper.connect=192.168.20.223:2181,192.168.20.224:2181,
> 192.168.20.225:2181
> group.initial.rebalance.delay.ms=0
> max.request.size=104857600
> message.max.bytes=104857600
>
> How can i fix or troubleshoot the error?
>
> Thanks
>
> Yavuz
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: During cluster peak, KAFKA NetworkProcessorAvgIdlePercent is lower than 0.2

2024-01-15 Thread Haruki Okada
You should investigate the cause of request-queue full situation first.
Since I guess low network idle ratio is the consequence of that.
(Network-threads would block on queueing when request-queue is full)

I recommend running async-profiler to take the profile of the broker
process if possible (with wallclock mode).
Then it will help you to identify the bottleneck which consuming
request-handler time.

2024年1月15日(月) 17:58 dong yu :

> This is my problem
> 1.The request queue is always at 500
> 2.There are 130 machines in the cluster, and the network idle rate of 30
> machines is less than 20.
>
>
> This is my BROKER configuration
> num.io.threads=32
> num.network.threads=64
>
> How should I locate the problem? I tried to increase the parameters but the
> effect was not obvious.
> THKS。
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-09 Thread Haruki Okada
> is there any gain on number of network calls being
made

No basically.
However, since next fetch requests are sent only when previous-fetched
records are processed, setting max.poll.records to too low would negatively
affect the network-call frequency depending on how you process records
Because if every poll() returns only small number of records,
batch-processing (which is known to be efficient) wouldn't be able so
processing previously-fetched records could take time due to the
insufficient throughput.

Also posted on SO: https://stackoverflow.com/a/77633494/3225746

2023年12月10日(日) 3:42 Debraj Manna :

> Can someone please clarify my below doubt? The same has been asked on stack
> overflow also.
>
> https://stackoverflow.com/q/77630586/785523
>
>
> On Fri, 8 Dec, 2023, 21:33 Debraj Manna,  wrote:
>
> > Thanks again.
> >
> > Another follow-up question, since max.poll.records has nothing to do with
> > fetch requests, then is there any gain on number of network calls being
> > made between consumer & broker if max.poll.records is set to 1 as against
> > let's say the default 500.
> >
> > On Wed, Dec 6, 2023 at 7:21 PM Haruki Okada  wrote:
> >
> >> poll-idle-ratio-avg=1.0 doesn't immediately mean fetch throughput
> problem
> >> since if processing is very fast, the metric will always be near 1.0.
> >>
> >> 2023年12月4日(月) 13:09 Debraj Manna :
> >>
> >> > Thanks for the reply.
> >> >
> >> > I read KIP
> >> > <
> >> >
> >>
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=127406453#KIP517:Addconsumermetricstoobserveuserpollbehavior-poll-idle-ratio-avg
> >> > >.
> >> > Can you let me know if I observe poll-idle-ration.avg equal to 1.0
> then
> >> > does that mean my fetch.max.bytes or max.partition.fetch.bytes is not
> >> > enough and I have to increase them? If not what could be the reason
> that
> >> > may cause poll-idle-ratio-avg to approach 1.0?
> >> >
> >> >
> >> > Can you let me know what
> >> >
> >> > On Sat, 2 Dec, 2023, 07:05 Haruki Okada,  wrote:
> >> >
> >> > > Hi.
> >> > >
> >> > > `max.poll.records` does nothing with fetch requests (refs:
> >> > >
> >> > >
> >> >
> >>
> https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
> >> > > )
> >> > >
> >> > > Then, how many records will be returned for single fetch request
> >> depends
> >> > on
> >> > > the partition-leader assignment. (note: we assume follower-fetch is
> >> not
> >> > > used here)
> >> > > If all partition leaders are in the same broker, 40MB (2MB * 20
> >> > partition)
> >> > > will be returned for a single fetch request.
> >> > >
> >> > > 2023年11月30日(木) 17:10 Debraj Manna :
> >> > >
> >> > > > The doc states that fetch.max.bytes & max.partition.fetch.bytes
> >> > > >
> >> > > > are not absolute maximum.  If the first record batch in the first
> >> > > non-empty
> >> > > > > partition of the fetch is larger than this limit, the batch will
> >> > still
> >> > > be
> >> > > > > returned to ensure that the consumer can make progress.
> >> > > >
> >> > > >
> >> > > > I am getting a bit confused.
> >> > > >
> >> > > > Let's say I have a configuration like below with sufficient
> >> messages in
> >> > > > each partition
> >> > > >
> >> > > >
> >> > > >- Partitions in a topic 20
> >> > > >- Single message size 2MB
> >> > > >- Consumers 5
> >> > > >- max.poll.records 20
> >> > > >- fetch.max.bytes 50 MB
> >> > > >- max.partition.fetch.bytes 1 MB.
> >> > > >
> >> > > > The broker config message.max.bytes and max.message.bytes is set
> to
> >> > > default
> >> > > > 100MB
> >> > > >
> >> > > > If the consumer does a poll will it receive 20 records? If yes
> then
> >> > there
> >> > > > is no significance of fetch.max.bytes & max.partition.fetch.bytes
> >> with
> >> > > > max.poll.records?
> >> > > >
> >> > > >
> >> > > >- Java Kafka Client - 3.5.1
> >> > > >- Kafka Broker - 2.8.1
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > 
> >> > > Okada Haruki
> >> > > ocadar...@gmail.com
> >> > > 
> >> > >
> >> >
> >>
> >>
> >> --
> >> 
> >> Okada Haruki
> >> ocadar...@gmail.com
> >> 
> >>
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-06 Thread Haruki Okada
poll-idle-ratio-avg=1.0 doesn't immediately mean fetch throughput problem
since if processing is very fast, the metric will always be near 1.0.

2023年12月4日(月) 13:09 Debraj Manna :

> Thanks for the reply.
>
> I read KIP
> <
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=127406453#KIP517:Addconsumermetricstoobserveuserpollbehavior-poll-idle-ratio-avg
> >.
> Can you let me know if I observe poll-idle-ration.avg equal to 1.0 then
> does that mean my fetch.max.bytes or max.partition.fetch.bytes is not
> enough and I have to increase them? If not what could be the reason that
> may cause poll-idle-ratio-avg to approach 1.0?
>
>
> Can you let me know what
>
> On Sat, 2 Dec, 2023, 07:05 Haruki Okada,  wrote:
>
> > Hi.
> >
> > `max.poll.records` does nothing with fetch requests (refs:
> >
> >
> https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
> > )
> >
> > Then, how many records will be returned for single fetch request depends
> on
> > the partition-leader assignment. (note: we assume follower-fetch is not
> > used here)
> > If all partition leaders are in the same broker, 40MB (2MB * 20
> partition)
> > will be returned for a single fetch request.
> >
> > 2023年11月30日(木) 17:10 Debraj Manna :
> >
> > > The doc states that fetch.max.bytes & max.partition.fetch.bytes
> > >
> > > are not absolute maximum.  If the first record batch in the first
> > non-empty
> > > > partition of the fetch is larger than this limit, the batch will
> still
> > be
> > > > returned to ensure that the consumer can make progress.
> > >
> > >
> > > I am getting a bit confused.
> > >
> > > Let's say I have a configuration like below with sufficient messages in
> > > each partition
> > >
> > >
> > >- Partitions in a topic 20
> > >- Single message size 2MB
> > >- Consumers 5
> > >- max.poll.records 20
> > >- fetch.max.bytes 50 MB
> > >- max.partition.fetch.bytes 1 MB.
> > >
> > > The broker config message.max.bytes and max.message.bytes is set to
> > default
> > > 100MB
> > >
> > > If the consumer does a poll will it receive 20 records? If yes then
> there
> > > is no significance of fetch.max.bytes & max.partition.fetch.bytes with
> > > max.poll.records?
> > >
> > >
> > >- Java Kafka Client - 3.5.1
> > >- Kafka Broker - 2.8.1
> > >
> >
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-01 Thread Haruki Okada
Hi.

`max.poll.records` does nothing with fetch requests (refs:
https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
)

Then, how many records will be returned for single fetch request depends on
the partition-leader assignment. (note: we assume follower-fetch is not
used here)
If all partition leaders are in the same broker, 40MB (2MB * 20 partition)
will be returned for a single fetch request.

2023年11月30日(木) 17:10 Debraj Manna :

> The doc states that fetch.max.bytes & max.partition.fetch.bytes
>
> are not absolute maximum.  If the first record batch in the first non-empty
> > partition of the fetch is larger than this limit, the batch will still be
> > returned to ensure that the consumer can make progress.
>
>
> I am getting a bit confused.
>
> Let's say I have a configuration like below with sufficient messages in
> each partition
>
>
>- Partitions in a topic 20
>- Single message size 2MB
>- Consumers 5
>- max.poll.records 20
>- fetch.max.bytes 50 MB
>- max.partition.fetch.bytes 1 MB.
>
> The broker config message.max.bytes and max.message.bytes is set to default
> 100MB
>
> If the consumer does a poll will it receive 20 records? If yes then there
> is no significance of fetch.max.bytes & max.partition.fetch.bytes with
> max.poll.records?
>
>
>- Java Kafka Client - 3.5.1
>- Kafka Broker - 2.8.1
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka 2.7.2 to 3.5.1 upgrade

2023-12-01 Thread Haruki Okada
Hi.

I'm not sure if KafkaManager has such bug though, you should check if
there's any under replicated partitions actually by `kafka-topics.sh`
command with `--under-replicated-partitions` option first.

2023年11月30日(木) 23:41 Lud Antonie :

> Hello,
>
> After upgrading from 2.7.2 to 3.5.1 some topics are missing a partition for
> one or two brokers.
> The kafka manager shows "Under replicated%" for the topic.
> Looking at the topic for some brokers (of 3) partitions are missing (in my
> case 1 partition).
> A rollback will restore the "Under replicated%" to 0 again (this is the
> wanted number).
>
> Is this a bug of kafka or the kafka manager?
>
> Best regards,
> Lud Antonie
>
>
> --
> Met vriendelijke groet / Kind regards,
>
> *Lud Antonie*
>
> 
> Kennedyplein 101, 5611 ZS, Eindhoven
> +31(0)402492700 <0031402492700>
> www.coosto.com
>  
> 
> 
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Haruki Okada
Hi.

JoinGroup request is sent from the polling/user thread.
In your example, the consumer instance will be removed from the group
because it didn't join the group within the timeout.
So the partition will be assigned to another consumer and be processed.

2023年11月26日(日) 18:09 Debraj Manna :

> Can someone let me know if the JoinRequest is sent by the consumer from the
> polling/user thread or from the background heart-beat thread?
>
> If JoinRequest is being sent from the polling/user thread then in this case
> if the poll user thread takes more than max.poll.interval.secs then the
> consumer will remain disconnected from the broker for that long. For
> example, if max.poll.interval.secs is 300 sec and if processing in the poll
> thread takes 15 mins then for 15 mins the partition from which this
> consumer was polling will remain idle and no message will be consumed from
> that partition. Is my understanding correct?
>
> I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
> default settings on the consumer configs.
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: [Question] About Kafka producer design decision making

2023-11-14 Thread Haruki Okada
Hi.

I also guess the main reason for using Future was for JDK1.7 support which
is no longer necessary in the current Kafka version.
Actually, there's a KIP about this:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=100829459
but it seems it's not active now.

> I wonder if it is recommended to use

For now, the practical way for asynchronous produce is to use producer
callback. We can easily create CompletableFuture using it:

CompletableFuture future = new CompletableFuture<>();
producer.send(record, (r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else {
future.complete(r);
}
});

2023年11月14日(火) 18:30 신수웅(Sean Sin) :

> Dear Apache Kakfa Developers,
>
> I'm 4-year SWE in South Korea.
> I have some questions while watching Kafka Producer API.
>
> *Why Use "Future" and Not "CompletableFuture"?*
>
> In the case of "Future", blocking occurs when calling "*get()*", so I
> thought "Computable Future" would be better when doing more asynchronous
> operations.
>
> I looked at the Java API document
> <
> https://kafka.apache.org/36/javadoc/org/apache/kafka/common/KafkaFuture.html#thenApply(org.apache.kafka.common.KafkaFuture.BaseFunction)
> >
> based on the latest version, version 3.6.x.
>
> If you look at that version, you can see that the Future object provides
> the "toCompletionStage() "method, which can convert "KafkaFuture" to
> "ComputableFuture".
>
> In response to this, I think that in the initial design decision process,
> we considered compatibility issues under JDK 1.8 and the level of knowledge
> of the learning curve or developer when introducing ComputableFuture, but I
> wonder if this is correct.
>
> In addition, I wonder if it is recommended to use the "toCompletionStage()"
> method to produce more non-blocking if we assume JDK 1.8 or higher.
>
> Thanks.
> Su-Ung Shin.
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: About Kafka Java Client Producer Retry And Callback

2023-11-13 Thread Haruki Okada
> will the callback be executed for each retry

The callback will be triggered only once when the produce is finally ended
up with succeeded or failed after retries.

> is there any way to make Kafka producers retry locally

Easiest way would be to make produce failing artificially. it can be done
by e.g.:
- set acks=all and set topic's min.insync.replicas to impossibly large value
- use iptables between producer and broker to block the network connectivity

2023年11月12日(日) 18:10 王有胜 :

> Hi Community, I use Kafka Java Client to send messages asynchronously.
> I wonder if the producer fails to send a message, during the retry
> period, will the callback be executed for each retry?
> I debugged the source code
> org.apache.kafka.clients.producer.internals.Sender#canRetry and
> org.apache.kafka.clients.producer.internals.Sender#reenqueueBatch
> parts, and I'm still not sure.
> Can anyone help answer this question?
> In addition, is there any way to make Kafka producers retry locally? I
> want to completely understand the code logic.
> Thanks!
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Multiple consumers for a Single Partition in Kafka

2023-10-03 Thread Haruki Okada
Hi Sree.

Yeah, parallel-processing-per-partition requirement often arises
particularly when the throughput is limited due to external I/O latency and
there are some solutions:

- https://github.com/line/decaton
* provides per-key ordering guarantee (or can be unordered if no
ordering is necessary)
* also refs:
https://speakerdeck.com/line_devday2020/decaton-high-performance-task-processing-with-kafka
- https://github.com/confluentinc/parallel-consumer
* also seems to provide per-key ordering guarantee or can be unordered


2023年10月3日(火) 23:11 Sree Sanjeev Kandasamy Gokulrajan <
sreesanjee...@gmail.com>:

> I'm interested in knowing if we can achieve parallelism by allowing
> multiple consumers to subscribe to a single partition of a topic.
>
> To explore potential solutions, I'm considering the following approach.
>
> 1. Implementing a locking mechanism to control access to the offsets in a
> partition to avoid concurrent processing issues.
>
> My primary concern is maintaining message ordering within the partition, as
> Kafka guarantees ordering within a partition. and message ordering is not
> the primary concern for many applications, so can we have a flag to enable
> multiple consumers to a single partition ?
>
>  I appreciate any insights, advice, or experiences that the Kafka community
> can share on this topic. Thank you!
>
> Regards,
> Sree Sanjeev.
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: ISR expansion vs. shrink eligibility

2023-03-28 Thread Haruki Okada
Hi.

So the question is about the difference between the leader LEO (shrink
criteria) and the leader HW (expand criteria), right?

1. Why shrink-criteria uses leader LEO
Since HW is defined as "the latest offset that is replicated to all ISRs",
it can't be used to kick out a replica from the ISR set. (By its
definition, if we use HW here, a replica will never be out-of-sync even
when it's lagged, because HW will not be updated in the meantime)

2. Why expand-criteria uses HW
In expand-criteria, replicaLagTime is not taken into consideration (
https://github.com/apache/kafka/blob/e28e0bf0f2c21206abccfffb280605dd02404678/core/src/main/scala/kafka/cluster/Partition.scala#L934-L936
).
So if we use leader LEO here, for out-of-sync replica joins to ISR, it has
to catch-up the leader instantaneously after a message is appended to the
leader, which is almost impossible.

... Then, I came up with another question: "let's say min.insync.replicas =
1. In this case, leader HW will be incremented alone, so other replicas
will never become in-sync?" => but found that the leader waits to increment
HW if there's a replica that is "caught-up" (i.e. catching-up fast enough
than replicaLagTime)
https://github.com/apache/kafka/blob/e28e0bf0f2c21206abccfffb280605dd02404678/core/src/main/scala/kafka/cluster/Partition.scala#L1048-L1075

2023年3月29日(水) 2:04 Zach Thornton :

> Hello!
>
> I was reading through the partition code, and I noticed that the criteria
> for expanding an ISR
> <
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L926-L958
> >
> differs
> from the criteria to shrink an ISR
> <
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L1167-L1178
> >
> .
>
> Specifically to summarize, I noticed that a replica can be considered as
> eligible for expansion if its local end offset is >= the leaders high
> watermark, but is considered "out of sync" if its local end offset != the
> leaders local end offset.  It was a bit surprising to me that the criteria
> here would be different, is there some part of the picture that I'm
> missing?
>
> Thanks in advance!
>
> Zach
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: kafka producer exception due to TimeoutException: Expiring records for topic 120000ms has passed since batch creation

2022-04-05 Thread Haruki Okada
Hi, Pushkar.

As the error message shows, the error means that some messages couldn't be
produced successfully for 120 seconds.
There are many causes which could lead to this phenomenon, so it's hard to
tell the solution unless more information is provided.
For example:
- Kafka broker-side's issue:
* Too many replicas were down so couldn't produce
* Took too long time for leader-election after previous leader failed
- Producer-side's issue:
* Many TCP retransmissions due to unstable network
* Producer's I/O thread couldn't proceed due to busy CPU or long GC
pauses or something
etc.

These are just examples so still there are many other possibilities.

Did you get other errors on your producer? Also, were brokers fine around
the incidental time?

2022年4月5日(火) 15:22 Pushkar Deole :

> Liam,
>
> This is set to default and we have not changed these configurations. So
> from kafka docs, the default value of linger.ms for producer is 0
>
> On Tue, Apr 5, 2022 at 4:42 AM Liam Clarke-Hutchinson  >
> wrote:
>
> > Hi Pushkar,
> >
> > Could be a lot of things. What's your linger.ms configured for?
> >
> > Cheers,
> >
> > Liam
> >
> > On Tue, 5 Apr 2022 at 05:39, Pushkar Deole  wrote:
> >
> > > Hi All,
> > >
> > > We are intermittently seeing KafkaProducerException. The nested
> exception
> > > is as below:
> > >
> > > org.springframework.kafka.core.KafkaProducerException: Failed to send;
> > > nested exception is org.apache.kafka.common.errors.TimeoutException:
> > > Expiring 10 record(s) for analytics.mpe.passthrough-0:12 ms has
> > passed
> > > since batch creation\n\tat
> > > org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(
> > > KafkaTemplate.java:602 )
> > >
> > > Kafka version is 2.5
> > > Can someone give some ideas as to what would cause this and how can
> this
> > be
> > > resolved?
> > >
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka performance when it comes to throughput

2022-01-06 Thread Haruki Okada
Hi, Marisa.

Kafka is well-designed to make full use of system resources, so I think
calculating based on machine's spec is a good start.

Let's say we have servers with 10Gbps full-duplex NIC.
Also, let's say we set the topic's replication factor to 3 (so the cluster
will have minimum 3 servers), and the average produced message size is 500
bytes.

Then, a single machine's spec-wise throughput bound will be calculated as
follows:
- Max messages / sec that single machine can transmit = 10Gbps / 8 (convert
to byte) / 3 (replicate to 2 replicas & fetched by 1 consumer group) / 500
= 833K.

Note that, of course this is just an example so you should also take other
factors into account (e.g. HDD throughput etc).
Also, I think producing / consuming to a single partition at a rate of 833K
msg/sec is a bit hard due to client-side bottlenecks so we may need to
adjust partition count as well.

However, at least, 833K msg/sec for 500 bytes messages with above spec
sounds not so far from my experience of running Kafka in production.

2022年1月7日(金) 0:01 Marisa Queen :

> Cheers from NYC!
>
> I'm trying to give a performance number to a potential client (from the
> financial market) who asked me the following question:
>
> *"If I have a Kafka system setup in the best way possible for performance,
> what is an approximate number that I can have in mind for the throughput of
> this system?"*
>
> The client proceeded to say:
>
> *"What I want to know specifically, is how many messages per second can I
> send from one side of my distributed system to the other side with Apache
> Kafka."*
>
> And he concluded with:
>
> *"To give you an example, let's say I have 10 million messages that I need
> to send from producers to consumers. Let's assume I have 1 topic, 1
> producer for this topic, 4 partitions for this topic and 4 consumers, one
> for each partition. What I would like to know is: How long is it going to
> take for these 10 million messages to travel all the way from the producer
> to the consumers? That's the throughput performance number I'm interested
> in."*
>
> I read in a reddit post yesterday (for some reason I can't find the post
> anymore) that Kafka is able to handle 7 trillion messages per day. The
> LinkedIn article about it, says:
>
>
> *"We maintain over 100 Kafka clusters with more than 4,000 brokers, which
> serve more than 100,000 topics and 7 million partitions. The total number
> of messages handled by LinkedIn’s Kafka deployments recently surpassed 7
> trillion per day."*
>
> The OP of the reddit post went on to say that WhatsApp is handling around
> 64 billion messages per day (740,000 msgs per sec x 24 x 60 x 60) and that
> 7
> trillion for LinkedIn is a huge number, giving a whopping 81 million
> messages per second for LinkedIn. But that doesn't matter for my question.
>
> 7 Trillion messages divided by 7 million partitions gives us 1 million
> messages per day per partition. So to calculate the throughput we do:
>
> 1 million divided by 60 divided by 60 divided by 24 => *23 messages per
> second per partition*
>
> We'll all agree that 23 messages per second per partition for throughput
> performance is very low, so I can't give this number to my potential
> client.
>
> So my question is: *What number should I give to my potential client?* Note
> that he is a stubborn and strict bank CTO, so he won't take any talk from
> me. He wants a mathematical answer using the scientific method.
>
> Has anyone been in my shoes and can shed some light on this kafka
> throughput performance topic?
>
> Cheers,
>
> M. Queen
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Help needed to migrate from one infra to another without downtime

2021-10-22 Thread Haruki Okada
Hi, Rijo.

This slide might help you to create a procedure to migrate the zk ensemble
without downtime.
https://speakerdeck.com/line_developers/split-brain-free-online-zookeeper-migration

The slide is based on zookeeper 3.4 so in your environment (3.5), the
procedure might be simplified thanks to dynamic reconfiguration though.


Thanks,

2021年10月21日(木) 4:46 Ran Lupovich :

> One thing that comes to my mind after reading your explanation, zk quorum
> should be odd number, you stated you have six zookeepers... I would suggest
> checking this matter, 3 , 5 , 7 etc...
>
> בתאריך יום ד׳, 20 באוק׳ 2021, 22:00, מאת Rijo Roy
> ‏:
>
> > Hi,
> >
> > Hope you are safe and well!
> >
> > Let me give a brief about my environment:
> >
> > OS: Ubuntu 18.04
> > Kafka Version: Confluent Kafka v5.5.1
> > ZooKeeper Version : 3.5.8
> > No.of Kafka Brokers: 3
> > No. of Zookeeper nodes: 3
> >
> > I am working on a project where we are aiming to move out from our
> > existing infrastructure lets call it A where Kafka and ZooKeeper clusters
> > are hosted to a better infrastructure lets call it B but with no or
> minimal
> > downtime. Once the cutover is done, we would like to terminate the old
> > infrastructure A.
> >
> > I was able to use kafka-reassign-partitions.sh as per the steps mentioned
> > in https://kafka.apache.org/documentation/#basic_ops_cluster_expansion
> to
> > move the topics-partitions to the Kafka brokers I created in B. Please
> note
> > that I have added 3 zookeeper nodes running in B into the zookeeper
> cluster
> > in A and hence they were following the ZK leader in A.
> > I was in the impression that since I had 6 nodes in the ZooKeeper
> > ensemble, stopping the A side of ZooKeeper nodes would not cause an issue
> > but I was wrong. As soon as I stopped the ZK process on the A nodes, B Zk
> > nodes failed to accept any connections from Kafka and I assume it is
> > because the leadership of ZK did not transfer to the ZK B nodes and
> failed
> > the quorum resulting in this failure. I had to remove the version-2
> folder
> > inside the B Zk nodes and starting them 1 by 1 after removing the details
> > of ZK A nodes from zookeeper.properties helped me to resolve the failure
> > and run the cluster on infrastructure B. I know I failed miserably but
> this
> > was a sandbox where I could afford the downtime but cannot in a
> production
> > setup. I request your help and guidance to make it right. Please help!
> >
> > Thanks in advance.
> >
> > Regards,Rijo S Roy
> >
> >
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: Kafka Scaling Ideas

2020-12-22 Thread Haruki Okada
Hm, it's an optimization for "first layer", so if the bottleneck is in
"second layer" (i.e. DB write) as you mentioned, it shouldn't make much
difference I think.

2020年12月22日(火) 16:02 Yana K :

> I thought about it but then we don't have much time - will it optimize
> performance?
>
> On Mon, Dec 21, 2020 at 4:16 PM Haruki Okada  wrote:
>
> > About "first layer" right?
> > Then it's better to make sure that not get() the result of
> Producer#send()
> > for each message, because in that way, it spoils the ability of
> > producer-batching.
> > Kafka producer batches messages by default and it's very efficient, so if
> > you produce in async way, it rarely becomes a bottleneck in general.
> > > Also are there any producer optimizations
> >
> > By the way, if "first layer" just filters then produces messages without
> > interacting with any other external DB, using KafkaStreams should be much
> > easier.
> >
> > 2020年12月22日(火) 3:27 Yana K :
> >
> > > Thanks!
> > >
> > > Also are there any producer optimizations anyone can think of in this
> > > scenario?
> > >
> > >
> > >
> > > On Mon, Dec 21, 2020 at 8:58 AM Joris Peeters <
> > joris.mg.peet...@gmail.com>
> > > wrote:
> > >
> > > > I'd probably just do it by experiment for your concrete data.
> > > >
> > > > Maybe generate a few million synthetic data rows, and for-each-batch
> > > insert
> > > > them into a dev DB, with an outer grid search over various candidate
> > > batch
> > > > sizes. You're looking to optimise for flat-out rows/s, so whichever
> > batch
> > > > size wins (given a fixed number of total rows) is near-optimal.
> > > > You can repeat the exercise with N simultaneous threads to inspect
> how
> > > > batch sizes and multiple partitions P would interact (which might
> well
> > be
> > > > sublinear in P in case of e.g. transactions etc).
> > > >
> > > > On Mon, Dec 21, 2020 at 4:48 PM Yana K  wrote:
> > > >
> > > > > Thanks Haruki and Joris.
> > > > >
> > > > > Haruki:
> > > > > Thanks for the detailed calculations. Really appreciate it. What
> > > tool/lib
> > > > > is used to load test kafka?
> > > > > So we've one consumer group and running 7 instances of the
> > application
> > > -
> > > > > that should be good enough - correct?
> > > > >
> > > > > Joris:
> > > > > Great point.
> > > > > DB insert is a bottleneck (and hence moved it to its own layer) -
> and
> > > we
> > > > > are batching but wondering what is the best way to calculate the
> > batch
> > > > > size.
> > > > >
> > > > > Thanks,
> > > > > Yana
> > > > >
> > > > > On Mon, Dec 21, 2020 at 1:39 AM Joris Peeters <
> > > > joris.mg.peet...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Do you know why your consumers are so slow? 12E6msg/hour is
> > > msg/s,
> > > > > > which is not very high from a Kafka point-of-view. As you're
> doing
> > > > > database
> > > > > > inserts, I suspect that is where the bottleneck lies.
> > > > > >
> > > > > > If, for example, you're doing a single-row insert in a SQL DB for
> > > every
> > > > > > message then this would incur a lot of overhead. Yes, you can
> > > somewhat
> > > > > > alleviate that by parallellising - i.e. increasing the partition
> > > count
> > > > -
> > > > > > but it is also worth looking at batch inserts, if you aren't yet.
> > > Say,
> > > > > each
> > > > > > consumer waits for 1000 messages or 5 seconds to have passed
> > > (whichever
> > > > > > comes first) and then does a single bulk insert of the msgs it
> has
> > > > > > received, followed by a manual commit.
> > > > > >
> > > > > > [A] you might already be doing this and [B] your DB of choice
> might
> > > not
> > > > > > support bulk inserts (although most do), but otherwise I'd expect
> > > this
> > > > to
> > > > > > work a lot better than increasing the partition count.

Re: Kafka Scaling Ideas

2020-12-21 Thread Haruki Okada
About "first layer" right?
Then it's better to make sure that not get() the result of Producer#send()
for each message, because in that way, it spoils the ability of
producer-batching.
Kafka producer batches messages by default and it's very efficient, so if
you produce in async way, it rarely becomes a bottleneck in general.
> Also are there any producer optimizations

By the way, if "first layer" just filters then produces messages without
interacting with any other external DB, using KafkaStreams should be much
easier.

2020年12月22日(火) 3:27 Yana K :

> Thanks!
>
> Also are there any producer optimizations anyone can think of in this
> scenario?
>
>
>
> On Mon, Dec 21, 2020 at 8:58 AM Joris Peeters 
> wrote:
>
> > I'd probably just do it by experiment for your concrete data.
> >
> > Maybe generate a few million synthetic data rows, and for-each-batch
> insert
> > them into a dev DB, with an outer grid search over various candidate
> batch
> > sizes. You're looking to optimise for flat-out rows/s, so whichever batch
> > size wins (given a fixed number of total rows) is near-optimal.
> > You can repeat the exercise with N simultaneous threads to inspect how
> > batch sizes and multiple partitions P would interact (which might well be
> > sublinear in P in case of e.g. transactions etc).
> >
> > On Mon, Dec 21, 2020 at 4:48 PM Yana K  wrote:
> >
> > > Thanks Haruki and Joris.
> > >
> > > Haruki:
> > > Thanks for the detailed calculations. Really appreciate it. What
> tool/lib
> > > is used to load test kafka?
> > > So we've one consumer group and running 7 instances of the application
> -
> > > that should be good enough - correct?
> > >
> > > Joris:
> > > Great point.
> > > DB insert is a bottleneck (and hence moved it to its own layer) - and
> we
> > > are batching but wondering what is the best way to calculate the batch
> > > size.
> > >
> > > Thanks,
> > > Yana
> > >
> > > On Mon, Dec 21, 2020 at 1:39 AM Joris Peeters <
> > joris.mg.peet...@gmail.com>
> > > wrote:
> > >
> > > > Do you know why your consumers are so slow? 12E6msg/hour is
> msg/s,
> > > > which is not very high from a Kafka point-of-view. As you're doing
> > > database
> > > > inserts, I suspect that is where the bottleneck lies.
> > > >
> > > > If, for example, you're doing a single-row insert in a SQL DB for
> every
> > > > message then this would incur a lot of overhead. Yes, you can
> somewhat
> > > > alleviate that by parallellising - i.e. increasing the partition
> count
> > -
> > > > but it is also worth looking at batch inserts, if you aren't yet.
> Say,
> > > each
> > > > consumer waits for 1000 messages or 5 seconds to have passed
> (whichever
> > > > comes first) and then does a single bulk insert of the msgs it has
> > > > received, followed by a manual commit.
> > > >
> > > > [A] you might already be doing this and [B] your DB of choice might
> not
> > > > support bulk inserts (although most do), but otherwise I'd expect
> this
> > to
> > > > work a lot better than increasing the partition count.
> > > >
> > > > On Mon, Dec 21, 2020 at 8:10 AM Haruki Okada 
> > > wrote:
> > > >
> > > > > About load test:
> > > > > I think it'd be better to monitor per-message process latency and
> > > > estimate
> > > > > required partition count based on it because it determines the max
> > > > > throughput per single partition.
> > > > > - Say you have to process 12 million messages/hour = 
> > messages/sec
> > > .
> > > > > - If you have 7 partitions (thus 7 parallel consumers at maximum),
> > > single
> > > > > consumer should process  / 7 = 476 messages/sec
> > > > > - It means, process latency per single message should be lower than
> > 2.1
> > > > > milliseconds (1000 / 476)
> > > > >   => If you have 14 partitions, it becomes 4.2 milliseconds
> > > > >
> > > > > So required partition count can be calculated by per-message
> process
> > > > > latency. (I think Spring-Kafka can be easily integrated with
> > prometheus
> > > > so
> > > > > you can use it to measure that)
> > > > >
> > > > > About increasing instance count:
&g

Re: Kafka Scaling Ideas

2020-12-21 Thread Haruki Okada
About load test:
I think it'd be better to monitor per-message process latency and estimate
required partition count based on it because it determines the max
throughput per single partition.
- Say you have to process 12 million messages/hour =  messages/sec .
- If you have 7 partitions (thus 7 parallel consumers at maximum), single
consumer should process  / 7 = 476 messages/sec
- It means, process latency per single message should be lower than 2.1
milliseconds (1000 / 476)
  => If you have 14 partitions, it becomes 4.2 milliseconds

So required partition count can be calculated by per-message process
latency. (I think Spring-Kafka can be easily integrated with prometheus so
you can use it to measure that)

About increasing instance count:
- It depends on current system resource usage.
  * If the system resource is not so busy (likely because the consumer just
almost waits DB-write to return), you don't need to increase consumer
instances
  * But I think you should make sure that single consumer instance isn't
assigned multiple partitions to fully parallelize consumption across
partitions. (If I remember correctly, ConcurrentMessageListenerContainer
has a property to configure the concurrency)

2020年12月21日(月) 15:51 Yana K :

> So as the next step I see to increase the partition of the 2nd topic - do I
> increase the instances of the consumer from that or keep it at 7?
> Anything else (besides researching those libs)?
>
> Are there any good tools for load testing kafka?
>
> On Sun, Dec 20, 2020 at 7:23 PM Haruki Okada  wrote:
>
> > It depends on how you manually commit offsets.
> > Auto-commit does commits offsets in async manner basically, so as long as
> > you do manual-commit in the same way,  there should be no much
> difference.
> >
> > And, generally offset-commit mode doesn't make much difference in
> > performance regardless manual/auto or async/sync unless offset-commit
> > latency takes significant amount in processing time (e.g. you commit
> > offsets synchronously in every poll() loop).
> >
> > 2020年12月21日(月) 11:08 Yana K :
> >
> > > Thank you so much Marina and Haruka.
> > >
> > > Marina's response:
> > > - When you say " if you are sure there is no room for perf optimization
> > of
> > > the processing itself :" - do you mean code level optimizations? Can
> you
> > > please explain?
> > > - On the second topic you say " I'd say at least 40" - is this based on
> > 12
> > > million records / hour?
> > > -  "if you can change the incoming topic" - I don't think it is
> possible
> > :(
> > > -  "you could artificially achieve the same by adding one more step
> > > (service) in your pipeline" - this is the next thing - but I want to be
> > > sure this will help, given we've to maintain one more layer
> > >
> > > Haruka's response:
> > > - "One possible solution is creating an intermediate topic" - I already
> > did
> > > it
> > > - I'll look at Decaton - thx
> > >
> > > Is there any thoughts on the auto commit vs manual commit - if it can
> > > better the performance while consuming?
> > >
> > > Yana
> > >
> > >
> > >
> > > On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada 
> > wrote:
> > >
> > > > Hi.
> > > >
> > > > Yeah, Spring-Kafka does processing messages sequentially, so the
> > consumer
> > > > throughput would be capped by database latency per single process.
> > > > One possible solution is creating an intermediate topic (or altering
> > > source
> > > > topic) with much more partitions as Marina suggested.
> > > >
> > > > I'd like to suggest another solution, that is multi-threaded
> processing
> > > per
> > > > single partition.
> > > > Decaton (https://github.com/line/decaton) is a library to achieve
> it.
> > > >
> > > > Also confluent has published a blog post about parallel-consumer (
> > > >
> > > >
> > >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > > > )
> > > > for that purpose, but it seems it's still in the BETA stage.
> > > >
> > > > 2020年12月20日(日) 11:41 Marina Popova  .invalid>:
> > > >
> > > > > The way I see it - you can only do a few things - if you are sure
> > there
> > > > is
> > > > > no room for perf optimization of the processing itself :
> > > > > 1. 

Re: Kafka Scaling Ideas

2020-12-20 Thread Haruki Okada
It depends on how you manually commit offsets.
Auto-commit does commits offsets in async manner basically, so as long as
you do manual-commit in the same way,  there should be no much difference.

And, generally offset-commit mode doesn't make much difference in
performance regardless manual/auto or async/sync unless offset-commit
latency takes significant amount in processing time (e.g. you commit
offsets synchronously in every poll() loop).

2020年12月21日(月) 11:08 Yana K :

> Thank you so much Marina and Haruka.
>
> Marina's response:
> - When you say " if you are sure there is no room for perf optimization of
> the processing itself :" - do you mean code level optimizations? Can you
> please explain?
> - On the second topic you say " I'd say at least 40" - is this based on 12
> million records / hour?
> -  "if you can change the incoming topic" - I don't think it is possible :(
> -  "you could artificially achieve the same by adding one more step
> (service) in your pipeline" - this is the next thing - but I want to be
> sure this will help, given we've to maintain one more layer
>
> Haruka's response:
> - "One possible solution is creating an intermediate topic" - I already did
> it
> - I'll look at Decaton - thx
>
> Is there any thoughts on the auto commit vs manual commit - if it can
> better the performance while consuming?
>
> Yana
>
>
>
> On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada  wrote:
>
> > Hi.
> >
> > Yeah, Spring-Kafka does processing messages sequentially, so the consumer
> > throughput would be capped by database latency per single process.
> > One possible solution is creating an intermediate topic (or altering
> source
> > topic) with much more partitions as Marina suggested.
> >
> > I'd like to suggest another solution, that is multi-threaded processing
> per
> > single partition.
> > Decaton (https://github.com/line/decaton) is a library to achieve it.
> >
> > Also confluent has published a blog post about parallel-consumer (
> >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > )
> > for that purpose, but it seems it's still in the BETA stage.
> >
> > 2020年12月20日(日) 11:41 Marina Popova :
> >
> > > The way I see it - you can only do a few things - if you are sure there
> > is
> > > no room for perf optimization of the processing itself :
> > > 1. speed up your processing per consumer thread: which you already
> tried
> > > by splitting your logic into a 2-step pipeline instead of 1-step, and
> > > delegating the work of writing to a DB to the second step ( make sure
> > your
> > > second intermediate Kafka topic is created with much more partitions to
> > be
> > > able to parallelize your work much higher - I'd say at least 40)
> > > 2. if you can change the incoming topic - I would create it with many
> > more
> > > partitions as well - say at least 40 or so - to parallelize your first
> > step
> > > service processing more
> > > 3. and if you can't increase partitions for the original topic ) - you
> > > could artificially achieve the same by adding one more step (service)
> in
> > > your pipeline that would just read data from the original 7-partition
> > > topic1 and just push it unchanged into a new topic2 with , say 40
> > > partitions - and then have your other services pick up from this topic2
> > >
> > >
> > > good luck,
> > > Marina
> > >
> > > Sent with ProtonMail Secure Email.
> > >
> > > ‐‐‐ Original Message ‐‐‐
> > > On Saturday, December 19, 2020 6:46 PM, Yana K 
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > I am new to the Kafka world and running into this scale problem. I
> > > thought
> > > > of reaching out to the community if someone can help.
> > > > So the problem is I am trying to consume from a Kafka topic that can
> > > have a
> > > > peak of 12 million messages/hour. That topic is not under my control
> -
> > it
> > > > has 7 partitions and sending json payload.
> > > > I have written a consumer (I've used Java and Spring-Kafka lib) that
> > will
> > > > read that data, filter it and then load it into a database. I ran
> into
> > a
> > > > huge consumer lag that would take 10-12hours to catch up. I have 7
> > > > instances of my application running to match the 7 partitions and I
> am
> > > > using a

Re: Kafka Scaling Ideas

2020-12-19 Thread Haruki Okada
Hi.

Yeah, Spring-Kafka does processing messages sequentially, so the consumer
throughput would be capped by database latency per single process.
One possible solution is creating an intermediate topic (or altering source
topic) with much more partitions as Marina suggested.

I'd like to suggest another solution, that is multi-threaded processing per
single partition.
Decaton (https://github.com/line/decaton) is a library to achieve it.

Also confluent has published a blog post about parallel-consumer (
https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/)
for that purpose, but it seems it's still in the BETA stage.

2020年12月20日(日) 11:41 Marina Popova :

> The way I see it - you can only do a few things - if you are sure there is
> no room for perf optimization of the processing itself :
> 1. speed up your processing per consumer thread: which you already tried
> by splitting your logic into a 2-step pipeline instead of 1-step, and
> delegating the work of writing to a DB to the second step ( make sure your
> second intermediate Kafka topic is created with much more partitions to be
> able to parallelize your work much higher - I'd say at least 40)
> 2. if you can change the incoming topic - I would create it with many more
> partitions as well - say at least 40 or so - to parallelize your first step
> service processing more
> 3. and if you can't increase partitions for the original topic ) - you
> could artificially achieve the same by adding one more step (service) in
> your pipeline that would just read data from the original 7-partition
> topic1 and just push it unchanged into a new topic2 with , say 40
> partitions - and then have your other services pick up from this topic2
>
>
> good luck,
> Marina
>
> Sent with ProtonMail Secure Email.
>
> ‐‐‐ Original Message ‐‐‐
> On Saturday, December 19, 2020 6:46 PM, Yana K 
> wrote:
>
> > Hi
> >
> > I am new to the Kafka world and running into this scale problem. I
> thought
> > of reaching out to the community if someone can help.
> > So the problem is I am trying to consume from a Kafka topic that can
> have a
> > peak of 12 million messages/hour. That topic is not under my control - it
> > has 7 partitions and sending json payload.
> > I have written a consumer (I've used Java and Spring-Kafka lib) that will
> > read that data, filter it and then load it into a database. I ran into a
> > huge consumer lag that would take 10-12hours to catch up. I have 7
> > instances of my application running to match the 7 partitions and I am
> > using auto commit. Then I thought of splitting the write logic to a
> > separate layer. So now my architecture has a component that reads and
> > filters and produces the data to an internal topic (I've done 7
> partitions
> > but as you see it's under my control). Then a consumer picks up data from
> > that topic and writes it to the database. It's better but still it takes
> > 3-5hours for the consumer lag to catch up.
> > Am I missing something fundamentally? Are there any other ideas for
> > optimization that can help overcome this scale challenge. Any pointer and
> > article will help too.
> >
> > Appreciate your help with this.
> >
> > Thanks
> > Yana
>
>
>

-- 

Okada Haruki
ocadar...@gmail.com



Re: multi-threaded consumer configuration like stream threads?

2020-11-23 Thread Haruki Okada
I see.

Then I think the appropriate approach depends on your delivery latency
requirements.
Just retrying until success is simpler but it could block subsequent
messages to get processed. (also depends on thread pool size though)

Then another concern when using dead letter topic would be retrying backoff.
If you don't use backoff, the production for dead letter topic could burst
when downstream db experiences transient problems but on the other hand
injecting backoff-delay would require consideration about how to not block
subsequent messages.

(FYI, Decaton provides retry-queueing with backoff out-of-the box. :)
https://github.com/line/decaton/blob/master/docs/retry-queueing.adoc)

2020年11月24日(火) 2:38 Pushkar Deole :

> Thanks Haruki... right now the max of such types of events that we would
> have is 100 since we would be supporting those many customers (accounts)
> for now, for which we are considering a simple approach of a single
> consumer and a thread pool with around 10 threads. So the question was
> regarding how to manage failed events, should those be retried until
> successful or sent to a dead letter queue/topic from where they will be
> processed again until successful.
>
>
> On Mon, Nov 23, 2020 at 10:16 PM Haruki Okada  wrote:
>
> > Hi Pushkar.
> >
> > Just for your information, https://github.com/line/decaton is a Kafka
> > consumer framework that supports parallel processing per single
> partition.
> >
> > It manages committable (i.e. the offset that all preceding offsets have
> > been processed) offset internally so that preserves at-least-once
> semantics
> > even when processing in parallel.
> >
> >
> > 2020年11月24日(火) 1:16 Pushkar Deole :
> >
> > > Thanks Liam!
> > > We don't have a requirement to maintain order of processing for events
> > even
> > > within a partition. Essentially, these are events for various accounts
> > > (customers) that we want to support and do necessary database
> > provisioning
> > > for those in our database. So they can be processed in parallel.
> > >
> > > I think the 2nd option would suit our requirement to have a single
> > consumer
> > > and a bound thread pool for processors. However, the issue we may face
> is
> > > to commit the offsets only after processing an event since we don't
> want
> > > the consumer to auto commit offsets before the provisioning done for
> the
> > > customer. How can that be achieved with model #2  ?
> > >
> > > On Tue, Oct 27, 2020 at 2:50 PM Liam Clarke-Hutchinson <
> > > liam.cla...@adscale.co.nz> wrote:
> > >
> > > > Hi Pushkar,
> > > >
> > > > No. You'd need to combine a consumer with a thread pool or similar as
> > you
> > > > prefer. As the docs say (from
> > > >
> > > >
> > >
> >
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > > > )
> > > >
> > > > We have intentionally avoided implementing a particular threading
> model
> > > for
> > > > > processing. This leaves several options for implementing
> > multi-threaded
> > > > > processing of records.
> > > > > 1. One Consumer Per Thread
> > > > > A simple option is to give each thread its own consumer instance.
> > Here
> > > > are
> > > > > the pros and cons of this approach:
> > > > >
> > > > >- *PRO*: It is the easiest to implement
> > > > >
> > > > >
> > > > >- *PRO*: It is often the fastest as no inter-thread
> co-ordination
> > is
> > > > >needed
> > > > >
> > > > >
> > > > >- *PRO*: It makes in-order processing on a per-partition basis
> > very
> > > > >easy to implement (each thread just processes messages in the
> > order
> > > it
> > > > >receives them).
> > > > >
> > > > >
> > > > >- *CON*: More consumers means more TCP connections to the
> cluster
> > > (one
> > > > >per thread). In general Kafka handles connections very
> efficiently
> > > so
> > > > this
> > > > >is generally a small cost.
> > > > >
> > > > >
> > > > >- *CON*: Multiple consumers means more requests being sent to
> the
> > > > >server and slightly less batching of data which can cause some
> > drop
> &g

Re: multi-threaded consumer configuration like stream threads?

2020-11-23 Thread Haruki Okada
Hi Pushkar.

Just for your information, https://github.com/line/decaton is a Kafka
consumer framework that supports parallel processing per single partition.

It manages committable (i.e. the offset that all preceding offsets have
been processed) offset internally so that preserves at-least-once semantics
even when processing in parallel.


2020年11月24日(火) 1:16 Pushkar Deole :

> Thanks Liam!
> We don't have a requirement to maintain order of processing for events even
> within a partition. Essentially, these are events for various accounts
> (customers) that we want to support and do necessary database provisioning
> for those in our database. So they can be processed in parallel.
>
> I think the 2nd option would suit our requirement to have a single consumer
> and a bound thread pool for processors. However, the issue we may face is
> to commit the offsets only after processing an event since we don't want
> the consumer to auto commit offsets before the provisioning done for the
> customer. How can that be achieved with model #2  ?
>
> On Tue, Oct 27, 2020 at 2:50 PM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Pushkar,
> >
> > No. You'd need to combine a consumer with a thread pool or similar as you
> > prefer. As the docs say (from
> >
> >
> https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
> > )
> >
> > We have intentionally avoided implementing a particular threading model
> for
> > > processing. This leaves several options for implementing multi-threaded
> > > processing of records.
> > > 1. One Consumer Per Thread
> > > A simple option is to give each thread its own consumer instance. Here
> > are
> > > the pros and cons of this approach:
> > >
> > >- *PRO*: It is the easiest to implement
> > >
> > >
> > >- *PRO*: It is often the fastest as no inter-thread co-ordination is
> > >needed
> > >
> > >
> > >- *PRO*: It makes in-order processing on a per-partition basis very
> > >easy to implement (each thread just processes messages in the order
> it
> > >receives them).
> > >
> > >
> > >- *CON*: More consumers means more TCP connections to the cluster
> (one
> > >per thread). In general Kafka handles connections very efficiently
> so
> > this
> > >is generally a small cost.
> > >
> > >
> > >- *CON*: Multiple consumers means more requests being sent to the
> > >server and slightly less batching of data which can cause some drop
> > in I/O
> > >throughput.
> > >
> > >
> > >- *CON*: The number of total threads across all processes will be
> > >limited by the total number of partitions.
> > >
> > > 2. Decouple Consumption and Processing
> > > Another alternative is to have one or more consumer threads that do all
> > > data consumption and hands off ConsumerRecords
> > > <
> >
> https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html
> >
> > instances
> > > to a blocking queue consumed by a pool of processor threads that
> actually
> > > handle the record processing. This option likewise has pros and cons:
> > >
> > >- *PRO*: This option allows independently scaling the number of
> > >consumers and processors. This makes it possible to have a single
> > consumer
> > >that feeds many processor threads, avoiding any limitation on
> > partitions.
> > >
> > >
> > >- *CON*: Guaranteeing order across the processors requires
> particular
> > >care as the threads will execute independently an earlier chunk of
> > data may
> > >actually be processed after a later chunk of data just due to the
> > luck of
> > >thread execution timing. For processing that has no ordering
> > requirements
> > >this is not a problem.
> > >
> > >
> > >- *CON*: Manually committing the position becomes harder as it
> > >requires that all threads co-ordinate to ensure that processing is
> > complete
> > >for that partition.
> > >
> > > There are many possible variations on this approach. For example each
> > > processor thread can have its own queue, and the consumer threads can
> > hash
> > > into these queues using the TopicPartition to ensure in-order
> consumption
> > > and simplify commit.
> >
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Tue, Oct 27, 2020 at 8:04 PM Pushkar Deole 
> > wrote:
> >
> > > Hi,
> > >
> > > Is there any configuration in kafka consumer to specify multiple
> threads
> > > the way it is there in kafka streams?
> > > Essentially, can we have a consumer with multiple threads where the
> > threads
> > > would divide partitions of topic among them?
> > >
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Protocol evolution/versioning docs are missing

2020-06-09 Thread Haruki Okada
Hi, Kafka.

While reading through protocol docs, I found that doc about protocol
evolution and versioning are missing in protocol.html, while toc contains a
section for it.

https://github.com/apache/kafka/blob/trunk/docs/protocol.html#L47-L50

Is there any plan to add a doc about protocol evolution?


Thanks,

-- 

Okada Haruki
ocadar...@gmail.com