[ https://issues.apache.org/jira/browse/KAFKA-9279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton reassigned KAFKA-9279: ------------------------------------ Assignee: Chris Egerton > Silent data loss in Kafka producer > ---------------------------------- > > Key: KAFKA-9279 > URL: https://issues.apache.org/jira/browse/KAFKA-9279 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 2.3.0 > Reporter: Andrew Klopper > Assignee: Chris Egerton > Priority: Major > > It appears that it is possible for a producer.commitTransaction() call to > succeed even if an individual producer.send() call has failed. The following > code demonstrates the issue: > {code:java} > package org.example.dataloss; > import java.nio.charset.StandardCharsets; > import java.util.Properties; > import java.util.Random; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.clients.producer.ProducerRecord; > import org.apache.kafka.common.serialization.ByteArraySerializer; > public class Main { > public static void main(final String[] args) { > final Properties producerProps = new Properties(); > if (args.length != 2) { > System.err.println("Invalid command-line arguments"); > System.exit(1); > } > final String bootstrapServer = args[0]; > final String topic = args[1]; > producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > bootstrapServer); > producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000"); > producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); > producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, > "1000000"); > producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, > "true"); > producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, > "dataloss_01"); > producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > "dataloss_01"); > try (final KafkaProducer<byte[], byte[]> producer = new > KafkaProducer<>(producerProps, new ByteArraySerializer(), new > ByteArraySerializer())) { > producer.initTransactions(); > producer.beginTransaction(); > final Random random = new Random(); > final byte[] largePayload = new byte[2000000]; > random.nextBytes(largePayload); > producer.send( > new ProducerRecord<>( > topic, > "large".getBytes(StandardCharsets.UTF_8), > largePayload > ), > (metadata, e) -> { > if (e == null) { > System.out.println("INFO: Large payload succeeded"); > } else { > System.err.printf("ERROR: Large payload failed: > %s\n", e.getMessage()); > } > } > ); > producer.commitTransaction(); > System.out.println("Commit succeeded"); > } catch (final Exception e) { > System.err.printf("FATAL ERROR: %s", e.getMessage()); > } > } > } > {code} > The code prints the following output: > {code:java} > ERROR: Large payload failed: The message is 2000093 bytes when serialized > which is larger than the maximum request size you have configured with the > max.request.size configuration. > Commit succeeded{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)