Andrew Klopper created KAFKA-9279:
-------------------------------------
Summary: Silent data loss in Kafka producer
Key: KAFKA-9279
URL: https://issues.apache.org/jira/browse/KAFKA-9279
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 2.3.0
Reporter: Andrew Klopper
It appears that it is possible for a producer.commitTransaction() call to
succeed even if an individual producer.send() call has failed. The following
code demonstrates the issue:
{code:java}
package org.example.dataloss;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
public class Main {
public static void main(final String[] args) {
final Properties producerProps = new Properties();
if (args.length != 2) {
System.err.println("Invalid command-line arguments");
System.exit(1);
}
final String bootstrapServer = args[0];
final String topic = args[1];
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
"1000000");
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
"true");
producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG,
"dataloss_01");
producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
"dataloss_01");
try (final KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps, new ByteArraySerializer(), new
ByteArraySerializer())) {
producer.initTransactions();
producer.beginTransaction();
final Random random = new Random();
final byte[] largePayload = new byte[2000000];
random.nextBytes(largePayload);
producer.send(
new ProducerRecord<>(
topic,
"large".getBytes(StandardCharsets.UTF_8),
largePayload
),
(metadata, e) -> {
if (e == null) {
System.out.println("INFO: Large payload succeeded");
} else {
System.err.printf("ERROR: Large payload failed: %s\n",
e.getMessage());
}
}
);
producer.commitTransaction();
System.out.println("Commit succeeded");
} catch (final Exception e) {
System.err.printf("FATAL ERROR: %s", e.getMessage());
}
}
}
{code}
The code prints the following output:
{code:java}
ERROR: Large payload failed: The message is 2000093 bytes when serialized which
is larger than the maximum request size you have configured with the
max.request.size configuration.
Commit succeeded{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)