Hi all,
I am facing a problem when I detect an exception in kafka producer
callback, I immediately stop sending more new records and stop the kafka
producer, but some extra records were still sent.

I found a way to resolve this issue: setting
max.in.flight.requests.per.connection to 1 and closing kafka producer when
encountering an exception in kafka producer callback.
set max.in.flight.requests.per.connection to 1 will make sure only one
request will be inflight for one partition, and closing kafka producer in
producer callback will make Sender in "forceClose" state thus avoiding
sending extra records.

But, as far as I know, setting max.in.flight.requests.per.connection to 1
will decrease the performance of kafka producer. I would like to know, is
there any other way to work around this issue without setting
max.in.flight.requests.per.connection to 1 so that I can ensure performance
of kafka producer?

here is my demo source code, you can also find it on Github Gist:
https://gist.github.com/52Heartz/a5d67cf266b35bafcbfa7bc2552f4576

public class KafkaProducerProblemDemo {

    public static void main(String[] args) {
        Logger rootLogger = (Logger)
LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
        rootLogger.setLevel(Level.INFO);

        String topicName = "test_topic_202403112035";
        Map<String, String> kafkaTopicConfigs = new HashMap<>();
        Properties props = new Properties();
        props.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "3000");
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
        CreateTopicsResult createTopicsResult;
        try (AdminClient adminClient = AdminClient.create(props)) {
            NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
            newTopic.configs(kafkaTopicConfigs);
            kafkaTopicConfigs.put(TopicConfig.SEGMENT_BYTES_CONFIG,
"1048576");
            kafkaTopicConfigs.put(TopicConfig.RETENTION_BYTES_CONFIG,
"1048576");
            kafkaTopicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
"86400000");
            createTopicsResult =
adminClient.createTopics(Collections.singletonList(newTopic));
            System.out.println(createTopicsResult.all().get());
        } catch (Exception e) {
            rootLogger.error("create topic error", e);
        }

        // adjust requestTimeout to ensure the request timeout is enough
        long requestTimeout = 2000;
        Properties kafkaProps = new Properties();
        kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class.getName());
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.ByteArraySerializer.class.getName());
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "
192.168.223.3:9094");
        kafkaProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeout));
        kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
        // force one batch per record
        kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "1");
        kafkaProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

        try (KafkaProducer<String, byte[]> kafkaProducer = new
KafkaProducer<>(kafkaProps)) {
            AtomicBoolean isFirstRecord = new AtomicBoolean(true);
            AtomicReference<Exception> sendException = new
AtomicReference<>();

            for (int i = 0; i < 2048; i++) {
                String content = String.valueOf(i);
                ProducerRecord<String, byte[]> record = new
ProducerRecord<>(topicName, content.getBytes());

                if (sendException.get() != null) {
                    // once found exception in callback, stop sending more
records
                    kafkaProducer.close();
                    break;
                }

                kafkaProducer.send(record, (RecordMetadata metadata,
Exception exception) -> {
                    if (isFirstRecord.getAndSet(false)) {
                        try {
                            // sleep more than twice the
DELIVERY_TIMEOUT_MS_CONFIG to make waiting batch expired
                            // simulate spend too much time in kafka
callback
                            Thread.sleep(requestTimeout * 2 + 1000);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }

                    if (exception != null) {
                        rootLogger.error("send data failed, record content:
{}, reason: {}", content, exception.toString());
                        sendException.compareAndSet(null, exception);
                    } else {
                        rootLogger.info("send data success, offset: {},
record content: {}", metadata.offset(), content);
                    }
                });

                Thread.sleep(1000);
            }
        } catch (Exception e) {
            rootLogger.error("send data error", e);
        }
    }
}

Reply via email to