Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread Hans Jespersen
Very good description with pictures in the book Kafka: The Definitive Guide

https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

-hans

> On Mar 26, 2020, at 12:00 PM, sunil chaudhari  
> wrote:
> 
> Again
> A consumer can have one or more consumer thread.
> The analogy of 12 partitions and 4 consumer is true when each consumer has
> 3 consumer threads.
> Please don’t skip the important factor “consumer thread” in this matter.
> 
> If you run each consumer with threads then you may need max 3 consumers for
> that consumer group.
> 
> If you have 12 partitions and you run 4 consumers with 4 consumer threads
> then 4 threads will be idle at any time T1.
> 
> I hope this is clear.
> 
> Thanks,
> Sunil.
> 
> On Thu, 26 Mar 2020 at 7:52 PM, Hans Jespersen  wrote:
> 
>>> As per my understanding, in Apache Kafka a single consumer from a
>> consumer
>>> group can consume messages from one partition only.
>> 
>> Not correct. A single consumer from a consumer group can consume from many
>> partitions. For example if you had a topic with 12 partitions and 4
>> consumers in a consumer group, each consumer in the group would consume
>> from 3 partitions.
>> 
>> -hans


Re: Reg : Slowness in Kafka

2020-03-26 Thread James Olsen
Also check your Kafka Client and Server versions.  There are serious latency 
issues when mixing different client and server versions IF your consumers 
handle multiple partitions.

> On 27/03/2020, at 12:59, Chris Larsen  wrote:
> 
> Hi Vidhya,
> 
> How many tasks are you running against the topic? How many partitions are
> on the topic?  Can you post the connector config anonymized?
> 
> Best,
> Chris
> 
> On Thu, Mar 26, 2020 at 17:58 Vidhya Sakar  wrote:
> 
>> Hi Team,
>> 
>> The Kafka consumer is reading only 8 records per second.We have implemented
>> apache Kafka and confluent connect S3. The confluent connect S3 collects
>> the records and pushes it to S3 bucket.
>> In this process, we are seeing some slowness like on an average only 8
>> records is being processed for a second. am planning to have a higher
>> throughout results, so that where there is a higher data load, it should
>> process more number of records.
>> 
>> Thanks in advance.
>> 
> -- 
> 
> *Chris Larsen*
> 
> Systems Engineer | Confluent
> 
> 847.274.3735
> 
> Follow us: Twitter  | blog
> 



Re: Get after put in stateStore returns null

2020-03-26 Thread Matthias J. Sax
Your code looks correct to me. If you write into the store, you should
also be able to read it back from the store.

Can you reproduce the issue using `TopologyTestDriver`? How many
partitions does your input topic have? Is your stream partitioned by
key? Note that `transfrom()` does not do auto-repartitioning in contrast
to `groupByKey()`.


-Matthias

On 3/25/20 3:49 AM, Jan Bols wrote:
> Hi all,
> I'm trying to aggregate a stream of messages and return a stream of
> aggregated results using kafka streams.
> At some point, depending on the incoming message, the old aggregate needs
> to be closed and a new aggregate needs to be created, just like a session
> that is closed due to some close event and at the same time a new session
> is started.
> 
> For this I'm using transformValues where I store the result of an
> aggregation similar to how a groupByKey().aggregate() is done. When the old
> session needs to be closed, it's sent first after the new value.
> 
> The state store returns null for a given key at first retrieval and the new
> aggregation result is stored under the same key.
> However, at the second pass, the value for the same key is still null even
> though it has just been stored before.
> 
> How can this be possible?
> 
> 
> 
> I'm using transformValues in the following way:
> 
> val storeName = "aggregateOverflow_binReportAgg"
> val store = Stores.keyValueStoreBuilder V>(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
> streamsBuilder.addStateStore(store)
> 
> ...
> 
> stream
>.flatTransformValues(ValueTransformerWithKeySupplier {
> AggregateOverflow(storeName, transformation) }, storeName)
> 
> 
> where AggregateOverflow gets the previous value from the state store,
> transforms the result into a AggregateOverflowResult.
> AggregateOverflowResult is a data class containing the current value and an
> optional overflow value like this:
> 
> data class AggregateOverflowResult(val current: V, val overflow: V?)
> 
> When the overflow value is not null, it's sent downstream first after the
> current value. In each case, the current result is stored in the statestore
> for later retrieval like the following:
> 
> class AggregateOverflow(
>  private val storeName: String,
>  private val transformation: (K, V, VR?) -> AggregateOverflowResult?) :
> ValueTransformerWithKey> {
>  private val logger = KotlinLogging.logger{}
>  private lateinit var state: KeyValueStore
> 
>  init {
>logger.debug { "$storeName: created" }
>  }
> 
>  override fun init(context: ProcessorContext) {
>logger.debug { "$storeName: init called" }
>this.state = context.getStateStore(storeName) as KeyValueStore;
>  }
> 
>  override fun transform(key: K, value: V): Iterable {
>val acc = state.get(key)
>if (acc == null) logger.debug { "$storeName: Found empty value for $key"
> }
>val result = transformation(key, value, acc)
>state.put(key, result?.current)
>logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> old: $acc\n aggregate new: $result" }
>return listOfNotNull(result?.overflow, result?.current) //prevAcc will
> be forwarded first if not null
>  }
> 
>  override fun close() {
>logger.debug { "$storeName: close called" }
>  }
> }
> 
> In the log file you can see that the first invocation is returning an empty
> value for the given key, you can also see that the new value is being
> serialized in the store.
> At the second invocation a few seconds later, the value for the same key is
> still null.
> 
> Any idea's why this is?
> Best regards
> Jan
> 



signature.asc
Description: OpenPGP digital signature


Re: Reg : Slowness in Kafka

2020-03-26 Thread Chris Larsen
Hi Vidhya,

How many tasks are you running against the topic? How many partitions are
on the topic?  Can you post the connector config anonymized?

Best,
Chris

On Thu, Mar 26, 2020 at 17:58 Vidhya Sakar  wrote:

> Hi Team,
>
> The Kafka consumer is reading only 8 records per second.We have implemented
> apache Kafka and confluent connect S3. The confluent connect S3 collects
> the records and pushes it to S3 bucket.
> In this process, we are seeing some slowness like on an average only 8
> records is being processed for a second. am planning to have a higher
> throughout results, so that where there is a higher data load, it should
> process more number of records.
>
> Thanks in advance.
>
-- 

*Chris Larsen*

Systems Engineer | Confluent

847.274.3735

Follow us: Twitter  | blog



Re: Max poll interval and timeouts

2020-03-26 Thread Matthias J. Sax
`max.poll.intervall.ms` is the maximum allowed time between two calls to
`poll()`.

Hence, this config seems to be unrelated. For the background heartbeat
thread there would be `session.timeout.ms` config but this also seems to
be unrelated.

What I don't fully understand is, what you try to achieve:

> I ultimately want to wait for the buffer to fill up or sit and collect data
> continuously for 30-45 mins at a time.

What do you exactly mean by this? Do you want `poll()` to block until N
messages are available (or return if less message are available but some
timeout, ie, 30 minutes hits?)

This would not work, because `poll()` has no _lower_ limit on number of
messaged to return.

What you should do instead is, to call `poll()` just in a loop and
buffer all messages in your application and trigger the computation when
your reach N messages or the desired timeout.

If buffering in your application is undesired, you can also get the
current offsets via `consumer.position()` and get the partition
end-offsets via `consumer.endOffsets()` and compute how many message are
available broker side and start to `poll()` if N or the timeout is reached.


-Matthias


On 3/25/20 3:17 AM, Steve Tian wrote:
> Hi Ryan,
> 
> Have you tried Consumer's pause/resume methods?
> 
> Steve
> 
> On Wed, Mar 25, 2020, 17:13 Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
> 
>> With group coordination protocol, you only have to increase the `
>> max.poll.interval.ms` / `max.poll.records`.
>> Ignore the above messages. Consumer heartbeats are processed in a separate
>> thread.
>>
>> On Wed, Mar 25, 2020 at 2:35 PM Kamal Chandraprakash <
>> kamal.chandraprak...@gmail.com> wrote:
>>
>>> Yes, with `assign` you'll lose the group coordination. You can still use
>>> the `subscribe` mode, update the above mentioned configs.
>>> You're ask is kind of Delay Queue. Kafka Consumer doesn't support that
>>> feature. You've to manually `sleep` in between the poll calls.
>>>
>>> On Tue, Mar 24, 2020 at 11:56 PM Ryan Schachte <
>> coderyanschac...@gmail.com>
>>> wrote:
>>>
 Don't I lose consumer group coordination with assign?

 On Mon, Mar 23, 2020 at 11:49 PM Kamal Chandraprakash <
 kamal.chandraprak...@gmail.com> wrote:

> Hi Ryan,
>
> The maxPollInterval waits for at-most the given time duration and
 returns
> ASAP even if a single record is available.
> If you want to collect data once 30-45 minutes,  better to use the
 Consumer
> with `assign` mode and poll for records
> once in 30 minutes.
>
> If you're using the consumer with `subscribe` mode, then you have to
 update
> the following configs:
> 1. session.timeout.ms
> 2. heartbeat.interval.ms and
> 3. group.max.session.timeout.ms in the broker configs.
>
> Increasing the session timeout will lead to delay in detecting the
 consumer
> failures, I would suggest to go with `assign` mode.
>
>
> On Tue, Mar 24, 2020 at 4:45 AM Ryan Schachte <
 coderyanschac...@gmail.com>
> wrote:
>
>> Hey guys, I'm getting a bit overwhelmed by the different variables
 used
> to
>> help enable batching for me.
>>
>> I have some custom batching logic that processes when either N
>> records
> have
>> been buffered or my max timeout has been hit. It was working
>> decently
> well,
>> but I hit this error:
>>
>> *This means that the time between subsequent calls to poll() was
 longer
>> than the configured max.poll.interval.ms <
>> http://max.poll.interval.ms
> ,
>> which typically implies that the poll loop is spending too much time
>> message processing.*
>>
>> I ultimately want to wait for the buffer to fill up or sit and
>> collect
> data
>> continuously for 30-45 mins at a time. Do I need to do anything with
>> heartbeat or session timeout as well?
>>
>> So now my question is.. Can I just bump my maxPollInterval to
 something
>> like:
>>
>> maxPollInterval: '270',
>>
>

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Reg : Slowness in Kafka

2020-03-26 Thread Hans Jespersen
Yes it should be going much faster than that. Something is wrong in your setup.

-hans

> On Mar 26, 2020, at 5:58 PM, Vidhya Sakar  wrote:
> 
> Hi Team,
> 
> The Kafka consumer is reading only 8 records per second.We have implemented
> apache Kafka and confluent connect S3. The confluent connect S3 collects
> the records and pushes it to S3 bucket.
> In this process, we are seeing some slowness like on an average only 8
> records is being processed for a second. am planning to have a higher
> throughout results, so that where there is a higher data load, it should
> process more number of records.
> 
> Thanks in advance.


Reg : Slowness in Kafka

2020-03-26 Thread Vidhya Sakar
Hi Team,

The Kafka consumer is reading only 8 records per second.We have implemented
apache Kafka and confluent connect S3. The confluent connect S3 collects
the records and pushes it to S3 bucket.
In this process, we are seeing some slowness like on an average only 8
records is being processed for a second. am planning to have a higher
throughout results, so that where there is a higher data load, it should
process more number of records.

Thanks in advance.


How to remove messed up segment log?

2020-03-26 Thread Richard Rossel
I was investigating a big topic, trying to find the reason why the retention
limits were not working.
I was able to read messages from 2019 even though the retention was for 5 days.
Until I found the log segment file that contains those 2019 messages,
and checking the rest
of the messages I realized its have messages from Dec 2020. Yep,
messages from the future.

| offset: 4513685442 CreateTime: 1609420521000 keysize: -1 valuesize:
197 sequence: -1 headerKeys: [] payload: 2020-12-31T13:15:21+00:00 ...

Besides fixing how we use the CreateTime for ingesting data, I need to
remove that data.
So my two questions are:

a) How is the strategy kafka uses to decide if a log segment needs to
be deleted. It's using Max CreateTime and compare with retention
limits?

b) How can I delete that whole log segment file, and of course,
without messing the system?


Thanks.-
-- 
Richard Rossel
Atlanta - GA


Re: leadership election timing on kafka broker failure

2020-03-26 Thread Larry Hemenway
I've tried to read up more on this issue and look at my logs. Here is what
I think should is happening when we restart the controlling broker that
also happens to be the leader of the partition in question:

1. Broker 0, the controlling broker that owns the partition we're looking at
2. The session times out & controller election starts (~4 seconds in trace
below)
3. After a controller is elected, it moves partition leadership (~3 seconds
in trace below)

What I would expect to minimize the time from (1) and (2) would be to
update:

- *kafka broker: zookeeper.session.timeout.ms
* on the Kafka broker
- *zookeeper:* *tickTime *on Zookeeper (session timeout must be >= 2 x the
tickTime, which defaults to 2 seconds)

But updating those didn't seem to have an effect on our controller election
timing. In addition, I would hope we could move partitions a bit faster
than 3 seconds, but can't see what those might be from looking at the
broker configurations.

I have a sample run where between (1) & (2) takes ~4 seconds and step (3)
completes in around 3 seconds:

>From a trace I'm looking at:

1.
2020-03-25 18:44:*58,267* INFO [ReplicaFetcher replicaId=1, leaderId=0,
fetcherId=0] Error sending fetch request (sessionId=1734000127,
epoch=14558) to node 0: java.io.IOException: Connection to 0 was
disconnected before the response was read.
(org.apache.kafka.clients.FetchSessionHandler) [ReplicaFetcherThread-0-0]

2.
2020-03-25 18:45:*02,103* INFO [Controller id=1] 1 successfully elected as
the controller. Epoch incremented to 9 and epoch zk version is now 9
(kafka.controller.KafkaController) [controller-event-thread]

3.
2020-03-25 18:45:*02,762* TRACE [Controller id=1 epoch=9] Sending
become-leader LeaderAndIsr request PartitionState(controllerEpoch=9,
leader=1, leaderEpoch=13, isr=1, zkVersion=19, replicas=1,0, isNew=false)
to broker 1 for partition nsm2app-0 (state.change.logger)
[controller-event-thread]
...
2020-03-25 18:45:*05,629* TRACE [Broker id=1] Completed LeaderAndIsr
request correlationId 5 from controller 1 epoch 9 for the become-leader
transition for partition nsm2app-0 (state.change.logger)
[data-plane-kafka-request-handler-2]


So this run was ~7+ seconds, but some runs are faster - probably in cases
the controlling broker doesn't move, and a few are a bit slower.


On Wed, Mar 25, 2020 at 2:22 PM Larry Hemenway 
wrote:

> All,
>
> We're experimenting with what happens in the event of a Kafka broker
> failure and we're seeing it take up to ~10 seconds for leadership to switch
> over. We've been unable to figure out if there are some parameters to
> tighten this timing.
>
> Are there broker config parameters that affect this timing?
>
> Alternatively, is there some documentation that would help me understand
> the broker failure and partition election?
>
> Thanks in advance for any help.
>
> Larry
>
>


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread sunil chaudhari
Again
A consumer can have one or more consumer thread.
The analogy of 12 partitions and 4 consumer is true when each consumer has
3 consumer threads.
Please don’t skip the important factor “consumer thread” in this matter.

If you run each consumer with threads then you may need max 3 consumers for
that consumer group.

If you have 12 partitions and you run 4 consumers with 4 consumer threads
then 4 threads will be idle at any time T1.

I hope this is clear.

Thanks,
Sunil.

On Thu, 26 Mar 2020 at 7:52 PM, Hans Jespersen  wrote:

> > As per my understanding, in Apache Kafka a single consumer from a
> consumer
> > group can consume messages from one partition only.
>
> Not correct. A single consumer from a consumer group can consume from many
> partitions. For example if you had a topic with 12 partitions and 4
> consumers in a consumer group, each consumer in the group would consume
> from 3 partitions.
>
> -hans


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread Hans Jespersen
> As per my understanding, in Apache Kafka a single consumer from a consumer
> group can consume messages from one partition only.

Not correct. A single consumer from a consumer group can consume from many 
partitions. For example if you had a topic with 12 partitions and 4 consumers 
in a consumer group, each consumer in the group would consume from 3 partitions.

-hans

Re: MirrorMaker2 not mirroring for 5 minutes when adding a topic

2020-03-26 Thread Péter Sinóros-Szabó
So it seems that MM2 is doing a lot of small steps to get the offsets and
consumer groups, am I right?
Maybe this need some optimization or batching. Well I am just guessing
here, I don't really know much about how it works.

Do you have any idea maybe it if can be speed up with configuration?

Thanks,
Peter

On Tue, 24 Mar 2020 at 16:14, Péter Sinóros-Szabó <
peter.sinoros-sz...@transferwise.com> wrote:

> Hey,
>
> so I turned on debug logging and now I see the MM2 does a lot, so it is
> hard to tell exactly what :D
>
> So, what I see the most during that "not mirroring messages" period is
> logs about:
> - [AdminClient clientId=adminclient-31] Queueing
> Call(callName=findCoordinator, deadlineMs=1584976266682) with a timeout
> 12 ms from now.
> - [AdminClient clientId=adminclient-31] Queueing
> Call(callName=listConsumerGroupOffsets, deadlineMs=1584976267162) with a
> timeout 11 ms from now.
> - [AdminClient clientId=adminclient-31] Using older server API v1 to send
> FIND_COORDINATOR {key=abdd997ddhfb8d,key_type=0} with correlation id
> 1473 to node 1004 (org.apache.kafka.clients.NetworkClient)
> - [AdminClient clientId=adminclient-31] Using older server API v3 to send
> OFFSET_FETCH {group_id=vv586499b754vctp8,topics=null} with
> correlation id 1062 to node 1006
> - [Consumer clientId=consumer-19, groupId=null] Added READ_UNCOMMITTED
> fetch request for partition mm2-offset-syncs.backup.internal-0 at position
> FetchPosition{offset=5187531, offsetEpoch=Optional[0],
> currentLeader=LeaderAndEpoch ...
> - [Consumer clientId=consumer-19, groupId=null] Sending READ_UNCOMMITTED
> IncrementalFetchRequest(toSend=(), toForget=(),
> implied=(mm2-offset-syncs.backup.internal-0)) to broker ...
> - [Consumer clientId=consumer-19, groupId=null] Using older server API v7
> to send FETCH
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800, ...
> - [Consumer clientId=consumer-21, groupId=null] Sending READ_UNCOMMITTED
> IncrementalFetchRequest(toSend=(), toForget=(),
> implied=(mm2-offset-syncs.backup.internal-0)) to broker 172.x.y.z:9092 (id:
> 1004 ...
>
> I can share the full log if that helps.
>
> Peter
>
> On Fri, 20 Mar 2020 at 17:52, Ryanne Dolan  wrote:
>
>> Hmm, maybe turn on debugging info and try to figure out what Connect is
>> doing during that time.
>>
>> Ryanne
>>
>> On Fri, Mar 20, 2020 at 6:15 AM Péter Sinóros-Szabó
>>  wrote:
>>
>> > Hi,
>> >
>> > I don't have the previous logs, so I restarted MM2, that produces the
>> same
>> > results. So new logs:
>> >
>> > MM2 starts and seems to be ready, but not mirroring message:
>> > [2020-03-20 10:50:11,985] INFO [Producer
>> > clientId=connector-producer-MirrorCheckpointConnector-0] Cluster ID:
>> > 700ZEsu0ShuzPZ6lZE54_Q (org.apache.kafka.clients.Metadata)
>> > [2020-03-20 10:50:19,927] INFO
>> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> >
>> > It takes another 5 minutes of just the usually "committing offsets"
>> kinda
>> > messages (these are all the logs MM2 prints at that time)
>> > [2020-03-20 10:50:19,927] INFO
>> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:19,927] INFO
>> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
>> > messages for offset commit
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:19,936] INFO
>> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
>> > successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,634] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-1}
>> > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,635] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-1}
>> > flushing 0 outstanding messages for offset commit
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,636] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-2}
>> > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,636] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-2}
>> > flushing 0 outstanding messages for offset commit
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,638] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-0}
>> > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,638] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-0}
>> > flushing 0 outstanding messages for offset commit
>> > (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,638] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-3}
>> > Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
>> > [2020-03-20 10:50:57,638] INFO
>> WorkerSourceTask{id=MirrorSourceConnector-3}
>> > flushing 0 outstandin

Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread sunil chaudhari
Hi Prasad,
Want to correct a bit. Ots not one consumer per partitions.
Its one consumer thread per partitions.


On Thu, 26 Mar 2020 at 4:49 PM, Prasad Suhas Shembekar <
ps00516...@techmahindra.com> wrote:

> Hi,
>
> I am using Apache Kafka as a Message Broker in our application. The
> producers and consumers are running as Docker containers in Kubernetes.
> Right now, the producer publishes messages to a topic in single partition.
> While the consumer consumes it from the topic.
> As per my understanding, in Apache Kafka a single consumer from a consumer
> group can consume messages from one partition only. Meaning, if there is
> only a single partition and multiple consumers in a consumer group, only
> one consumer will consume the message and the rest will remain idle, till
> Apache Kafka does the partition rebalancing.
> As mentioned earlier, we have a single topic and single partition and
> multiple consumers in a single group. Thus we won't be able to achieve the
> horizontal scaling for message consumption.
>
> Please let me know if the above understanding is correct.
>
> I am looking out on how to create partitions dynamically in the topic, as
> and when a new consumer is added to consumer group (K8S auto scaling of
> PODS).
> Also, how to make the producer write to these different partitions created
> dynamically, without overloading few partitions.
>
> Request you to provide some inputs / suggestions on how to achieve this.
>
> Thanks & Regards,
> Prasad Shembekar
> Blue Marble
> WST-020, D Non-ODC, Mihan SEZ,
> Nagpur
> Extension: 6272148
> Direct: 0712-6672148
>
> 
> Disclaimer: This message and the information contained herein is
> proprietary and confidential and subject to the Tech Mahindra policy
> statement, you may review the policy at
> http://www.techmahindra.com/Disclaimer.html externally
> http://tim.techmahindra.com/tim/disclaimer.html internally within
> TechMahindra.
> 
>


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread M. Manna
Hey Prasad (#StayAtHomeSaveLives),

On Thu, 26 Mar 2020 at 11:19, Prasad Suhas Shembekar <
ps00516...@techmahindra.com> wrote:

> Hi,
>
> I am using Apache Kafka as a Message Broker in our application. The
> producers and consumers are running as Docker containers in Kubernetes.
> Right now, the producer publishes messages to a topic in single partition.
> While the consumer consumes it from the topic.
> As per my understanding, in Apache Kafka a single consumer from a consumer
> group can consume messages from one partition only. Meaning, if there is
> only a single partition and multiple consumers in a consumer group, only
> one consumer will consume the message and the rest will remain idle, till
> Apache Kafka does the partition rebalancing.
>

 Yes this is correct.


> As mentioned earlier, we have a single topic and single partition and
> multiple consumers in a single group. Thus we won't be able to achieve the
> horizontal scaling for message consumption.
>
> Please let me know if the above understanding is correct.
>

 Yes this is correct.

>
> I am looking out on how to create partitions dynamically in the topic, as
> and when a new consumer is added to consumer group (K8S auto scaling of
> PODS).
> Also, how to make the producer write to these different partitions created
> dynamically, without overloading few partitions.
>
> Request you to provide some inputs / suggestions on how to achieve this.
>
>  Before anyone could answer any specific use case-related questions,
perhaps you could read this
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

I believe this could serve as a great pointer and learning experience (it
certainly did for myself) before you could tackle more precise cases. Feel
free to follow up and share your concerns after this.


> Thanks & Regards,
> Prasad Shembekar
> Blue Marble
> WST-020, D Non-ODC, Mihan SEZ,
> Nagpur
> Extension: 6272148
> Direct: 0712-6672148
>
> 
> Disclaimer: This message and the information contained herein is
> proprietary and confidential and subject to the Tech Mahindra policy
> statement, you may review the policy at
> http://www.techmahindra.com/Disclaimer.html externally
> http://tim.techmahindra.com/tim/disclaimer.html internally within
> TechMahindra.
> 
>


Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread Prasad Suhas Shembekar
Hi,

I am using Apache Kafka as a Message Broker in our application. The producers 
and consumers are running as Docker containers in Kubernetes.
Right now, the producer publishes messages to a topic in single partition. 
While the consumer consumes it from the topic.
As per my understanding, in Apache Kafka a single consumer from a consumer 
group can consume messages from one partition only. Meaning, if there is only a 
single partition and multiple consumers in a consumer group, only one consumer 
will consume the message and the rest will remain idle, till Apache Kafka does 
the partition rebalancing.
As mentioned earlier, we have a single topic and single partition and multiple 
consumers in a single group. Thus we won't be able to achieve the horizontal 
scaling for message consumption.

Please let me know if the above understanding is correct.

I am looking out on how to create partitions dynamically in the topic, as and 
when a new consumer is added to consumer group (K8S auto scaling of PODS).
Also, how to make the producer write to these different partitions created 
dynamically, without overloading few partitions.

Request you to provide some inputs / suggestions on how to achieve this.

Thanks & Regards,
Prasad Shembekar
Blue Marble
WST-020, D Non-ODC, Mihan SEZ,
Nagpur
Extension: 6272148
Direct: 0712-6672148


 Disclaimer: This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html externally 
http://tim.techmahindra.com/tim/disclaimer.html internally within TechMahindra.