[ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viczai Gábor updated KAFKA-14553:
---------------------------------
    Description: 
*Summary:*
There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
configured to be 0 in BuiltinPartitioner.
(Which is the default case when using KafkaSender's default Builder.)

*Details:*
The infinite loop is caused by this while:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
and this continue particularly:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
because the partitionChanged() call in the condition always return true if 
batchSize is 0.

So program flow never reaches this point:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
Thus no span data sent to Kafka ever.

The problematic line in partitionChanged() is when it calls an update on the 
BuiltInPartitioner:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
which in fact always updates the partition because of this condition:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
therefore the next confdition in RecordAccumulator will evaluate to true also:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
thus returning 'true' and forcing the 'continue' in the while(true) loop.

*Suggested fix:*
I think these conditions should be changed:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
The equal signs should be removed from the conditions:
{code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
stickyBatchSize * 2) {{code}
(Btw: line 213 also needs this modification.)

*Note:*
The problem arises because KafkaSender sets the batchSize to 0.
https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88

*Workaround:*
Simply set the batch size greater than zero.
{code:java}@Configuration
public class SenderConfiguration {

    @Bean
    KafkaSender kafkaSender() {
        Properties overrides = new Properties();
        overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
        return KafkaSender.newBuilder()
            .bootstrapServers("localhost:9092")
            .topic("zipkin")
            .overrides(overrides)
            .build();
    }
}{code}

*Using:*
- Spring Boot 3.0.1
- Spring Cloud 2022.0.0

pom.xml (fragment):
{code:xml}        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-sender-kafka</artifactId>
        </dependency>{code}

Everything is on default settings, except a KafkaSender is explicitely created 
as illustrated above. (No autoconfiguration available for Kafka sender.)

  was:
*Summary:*
There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
configured to be 0 in BuiltinPartitioner.
(Which is the default case when using KafkaSender's default Builder.)

*Details:*
The infinite loop is caused by this while:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
and this continue particularly:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
because the partitionChanged() call in the condition always return true if 
batchSize is 0.

So program flow never reaches this point:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
Thus no span data sent to Kafka ever.

The problematic line in partitionChanged() is when it calls an update on the 
BuiltInPartitioner:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
which in fact always updates the partition because of this condition:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
therefore the next confdition in RecordAccumulator will evaluate to true also:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
thus returning 'true' and forcing the 'continue' in the while(true) loop.

Suggested fix:
I think these conditions should be changed:
https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
The equal signs should be removed from the conditions:
{code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
stickyBatchSize * 2) {{code}
(Btw: line 213 also needs this modification.)

*Note:*
The problem arises because KafkaSender sets the batchSize to 0.
https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88

*Workaround:*
Simply set the batch size greater than zero.
{code:java}@Configuration
public class SenderConfiguration {

    @Bean
    KafkaSender kafkaSender() {
        Properties overrides = new Properties();
        overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
        return KafkaSender.newBuilder()
            .bootstrapServers("localhost:9092")
            .topic("zipkin")
            .overrides(overrides)
            .build();
    }
}{code}

*Using:*
- Spring Boot 3.0.1
- Spring Cloud 2022.0.0

pom.xml (fragment):
{code:xml}        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-tracing-bridge-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-reporter-brave</artifactId>
        </dependency>
        <dependency>
            <groupId>io.zipkin.reporter2</groupId>
            <artifactId>zipkin-sender-kafka</artifactId>
        </dependency>{code}

Everything is on default settings, except a KafkaSender is explicitely created 
as illustrated above. (No autoconfiguration available for Kafka sender.)


> RecordAccumulator hangs in infinite NOP loop
> --------------------------------------------
>
>                 Key: KAFKA-14553
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14553
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.3.1
>         Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>            Reporter: Viczai Gábor
>            Priority: Minor
>             Fix For: 3.4.0, 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        <dependency>
>             <groupId>org.springframework.boot</groupId>
>             <artifactId>spring-boot-autoconfigure</artifactId>
>         </dependency>
>         <dependency>
>             <groupId>org.springframework.boot</groupId>
>             <artifactId>spring-boot-starter-actuator</artifactId>
>         </dependency>
>         <dependency>
>             <groupId>io.micrometer</groupId>
>             <artifactId>micrometer-registry-prometheus</artifactId>
>         </dependency>
>         <dependency>
>             <groupId>io.micrometer</groupId>
>             <artifactId>micrometer-tracing-bridge-brave</artifactId>
>         </dependency>
>         <dependency>
>             <groupId>io.zipkin.reporter2</groupId>
>             <artifactId>zipkin-reporter-brave</artifactId>
>         </dependency>
>         <dependency>
>             <groupId>io.zipkin.reporter2</groupId>
>             <artifactId>zipkin-sender-kafka</artifactId>
>         </dependency>{code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to