Re: How is __consumer_offsets partitioned?

2021-09-05 Thread James Olsen
If it's of any value to you, we use the following test to check that we have a 
well balanced set of consumer group ids.  Note that in the code, 
ConsumerGroups.ALL_GROUPS is simply a list of all our consumer group ids.  
Spreading the offset commit load across these partitions evenly helps in 
levelling the load on your Brokers.  But beware of changing your group ids in 
an active system - you'll need to migrate carefully and prime the new offsets 
in the renamed group if you wish to avoid message replay (depending on your 
config).

import java.util.HashSet;
import java.util.Set;

import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerGroupsTest {

private static final Logger LOG = 
LoggerFactory.getLogger(ConsumerGroupsTest.class);

/* Per Kafka Docs. */
private static final int KAFKA_OFFSET_PARTITION_COUNT = 50;

@Test
void distinctConsumerOffsetPartitions() {
boolean hasDuplicates = false;
Set usedPartitions = new 
HashSet<>(KAFKA_OFFSET_PARTITION_COUNT);
for (String group : ConsumerGroups.ALL_GROUPS) {
int partition = getConsumerOffsetPartition(group);
hasDuplicates = !usedPartitions.add(partition) || 
hasDuplicates;
}
/*
 * NB: It is not an absolute requirement to have no clashes. 
It's simply desirable.
 */
Assertions.assertFalse(hasDuplicates,
"Multiple Consumer Groups map to same offsets 
partition.  See prior log output.");
}

private int getConsumerOffsetPartition(String group) {
final int partition = Utils.abs(group.hashCode()) % 
KAFKA_OFFSET_PARTITION_COUNT;
LOG.info("{} --> {}", group, partition);
return partition;
}

}


> On 4/09/2021, at 15:30, Luke Chen  wrote:
> 
> Hi Michał,
> Internally, Kafka uses "*consumer group ID*" as the key to decide which
> __consumer_offsets partition to be used.
> The code is like this:
> 
> `Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount`
> 
> You can check the `partitionFor` method in GroupMetadataManager class.
> 
> Hope that helps.
> 
> Thank you.
> Luke
> 
> On Fri, Sep 3, 2021 at 11:57 PM Michał Łowicki  wrote:
> 
>> Hey,
>> 
>> Could someone please point me to the code / doc where I can find
>> information what is used as key and how messages are spread across
>> partitions for this internal topic?
>> 
>> --
>> BR,
>> Michał Łowicki
>> 



Re: How is __consumer_offsets partitioned?

2021-09-03 Thread Luke Chen
Hi Michał,
Internally, Kafka uses "*consumer group ID*" as the key to decide which
__consumer_offsets partition to be used.
The code is like this:

`Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount`

You can check the `partitionFor` method in GroupMetadataManager class.

Hope that helps.

Thank you.
Luke

On Fri, Sep 3, 2021 at 11:57 PM Michał Łowicki  wrote:

> Hey,
>
> Could someone please point me to the code / doc where I can find
> information what is used as key and how messages are spread across
> partitions for this internal topic?
>
> --
> BR,
> Michał Łowicki
>


How is __consumer_offsets partitioned?

2021-09-03 Thread Michał Łowicki
Hey,

Could someone please point me to the code / doc where I can find
information what is used as key and how messages are spread across
partitions for this internal topic?

-- 
BR,
Michał Łowicki