[ 
https://issues.apache.org/jira/browse/KAFKA-10334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-10334.
-----------------------------------
    Resolution: Duplicate

> Transactions not working properly
> ---------------------------------
>
>                 Key: KAFKA-10334
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10334
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, producer 
>    Affects Versions: 2.1.0, 2.3.0
>            Reporter: Luis Araujo
>            Priority: Major
>
> I'm using transactions provided by Kafka Producer API in a Scala project 
> built with SBT. The dependency used in the project is: 
> {code:java}
> "org.apache.kafka" % "kafka-clients" % "2.1.0" {code}
> I followed the documentation and I was expecting that transactions fail when 
> I call *.commitTransaction* if some problem is raised when sending a message 
> like it's described in the 
> [documentation|https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send-org.apache.kafka.clients.producer.ProducerRecord-org.apache.kafka.clients.producer.Callback-].
> Unfortunately, when testing this behaviour using a message larger than the 
> size accepted by the Kafka broker/cluster, the transactions are not working 
> properly.
> I tested with a 3 Kafka broker cluster with 1MB message max size (default 
> value):
>  - when the message has 1MB, the transaction is aborted and an exception is 
> raised when calling *commitTransaction()*
>  - when the message is bigger than 1MB, the transaction is completed 
> successfully *without* the message being written. No exception is thrown.
> As an example, this means that when I produce 9 messages with 1 KB and 1 
> message with 1.1MB in the same transaction, the transaction is completed but 
> only 9 messages are written to the Kafka cluster.
> I tested this behaviour with Kafka version 2.1.0 and 2.3.0 in both Kafka 
> cluster and Kafka Producer API.
> The configs that I'm using to create the KafkaProducer in order to use 
> transactions:
> {code:java}
> new Properties() {
>   {
>     put(BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:29092,localhost:29093,localhost:29094")
>     put(ACKS_CONFIG, "-1")
>     put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
>     put(KEY_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[StringSerializer].getName))
>     put(VALUE_SERIALIZER_CLASS_CONFIG, 
> Class.forName(classOf[ByteArraySerializer].getName))
>     put(CLIENT_ID_CONFIG, "app")
>     put(TRANSACTIONAL_ID_CONFIG, "app")
>     put(ENABLE_IDEMPOTENCE_CONFIG, "true")
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to