Suriya Vijayaraghavan created KAFKA-13180:
---------------------------------------------

             Summary: Data Distribution among partitions not working as Expected
                 Key: KAFKA-13180
                 URL: https://issues.apache.org/jira/browse/KAFKA-13180
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.8.0
            Reporter: Suriya Vijayaraghavan


Hi team, we are facing a weird issue. not sure if anyone else faced this same. 
But we are able to identify the flow.

Issue
 Using RoundiRobin partitioner with even number of partitions n, resulting in 
always produce to only n/2 number of partitions

Is Reproducible: yes

Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying 
to produce to a topic with RoundRobin partitioner.

The RoundRobin partitioner is working based on the index of an ArrayList of 
partition info. For our case lest assume the order of the partitions is 
populated as below in the array list.

{1,2,3,4,5,0}

Expected flow: Even distribution to 6 partitions

How it worked: Data was produced only to partition 2,4,0.

Why:
 On debugging further with the producer flow, we noticed below highlighted 
method in doSend method of KafkaProducer.
{quote}int partition = *partition*(record, serializedKey, serializedValue, 
cluster);
 tp = new TopicPartition(record.topic(), partition);
 .....
 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, 
timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *true*, nowMs);
 if (result.abortForNewBatch) {
 int prevPartition = partition;
 partitioner.onNewBatch(record.topic(), cluster, prevPartition);
 partition = *partition*(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);
 .....
 result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *false*, nowMs);
{quote}
here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque 
that is derived in the RecordAccumulator.append method will always be empty on 
the first message too. Which will try to create a new batch.

And for the new batch, a new TopicPartition Object is being created, which will 
have partition 2. And in this flow, the abortOnNewBatch is passed as false, so 
the record will get added in the DeQueue for this topicpartition.

How ever this will get distributed properly if the total number of partitions 
are odd, as the first record is getting addition will only succed when the 
abordOnNewbatch is passed as false (lets say it as second invoke).

the order of the invoke will be as follows for an even number of odd number of 
partitions and even.

ODD: \{1,2,3,4,0}
 Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 0 - 1
 2 - 3
 4 - 0

Dequeue populated partitions = \{2,4,1,3,0}

EVEN: \{1,2,3,4,5,0}

Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
5 - 0.........

Dequeue populated partitions = \{2,4,0}

will go on continuosly as all partitions will never be initated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to