Hi all,
Batching is integral part of kafka for better TPS, especially with
transactional design and its expensive commit API. Idea is to send multiple
records per transaction.
Below is the working code for single record per transaction:
```java
from("file:input?fileName=input.txt&noop=true")
.split(body().tokenize("\n")).streaming()
.to("kafka:topic2?brokers=<broker-ip>:31161" +
"&requestRequiredAcks=all" +
"&lingerMs=10" +
//"&synchronous=true" + //commented
"&additionalProperties.enable.idempotence=true" +
"&additionalProperties.transactional.id=newtxn-53" +
"&additionalProperties.retries=5");
```
Above logic is a single exchange per transaction, leading to very low TPS.
As per camel-kafka doc, aggrgate() can be used to increase producer
performance. But aggregate() doesn't work for kafka transactional endpoint in
camel-kafka.
Doc: https://camel.apache.org/components/4.10.x/kafka-component.html
Non-working code with aggregate() :
```java
from("file:input?fileName=input.txt&noop=true")
.split(body().tokenize("\n")).streaming()
//.delay(80)
.aggregate(constant(true), new
GroupedExchangeAggregationStrategy())
.completionSize(2)
.completionInterval(100)
.to("kafka:topic2?brokers=<broker-ip>:31161" +
"&requestRequiredAcks=all" +
"&lingerMs=10" +
//"&synchronous=true" +
"&additionalProperties.enable.idempotence=true" +
"&additionalProperties.transactional.id=newtxn-10" +
"&additionalProperties.retries=5");
```
**Exception:**
10] ProducerId set to 1006 with epoch 0
[com.example.FileToKafkaApp.main()] INFO
org.apache.camel.impl.engine.AbstractCamelContext - Routes startup (started:1)
[com.example.FileToKafkaApp.main()] INFO
org.apache.camel.impl.engine.AbstractCamelContext - Started route1
([file://input](file://input/))
[com.example.FileToKafkaApp.main()] INFO
org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel 3.21.0
(camel-1) started in 1s523ms (build:63ms init:322ms start:1s138ms)
Camel started. Press Ctrl+C to stop.
After aggregation - Body type:
org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
After aggregation - Body content: List<Exchange>(2 elements)
After aggregation - Body type:
org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$GroupedExchangeList
After aggregation - Body content: List<Exchange>(2 elements)
**Exception occurred: TransactionalId newtxn-10: Invalid transition attempted
from state IN_TRANSACTION to state IN_TRANSACTION**
Debug:
After some debugging, I found that adding some delay (commented in above route)
makes the above route work seamlessly. So what may be happening is: aggregate()
route thread is providing next batch to kafka internal producer thread(s)
faster than it takes for kafka producer thread(s) to be done with existing
batch.
Requirement:
Any idea if it's an unexpected behaviour and we should be able to batch kafka
transaction in camel with this way or another, without writing custom code or
processor?
Disclaimer: "Information contained and transmitted by this E-MAIL including any
attachment is proprietary to mCarbon Tech Innovation Private Limited and is
intended solely for the addressee/s, and may contain information that is
privileged, confidential or exempt from disclosure under applicable law. Access
to this e-mail and/or to the attachment by anyone else is unauthorised. If this
is a forwarded message, the content and the views expressed in this E-MAIL may
not reflect those of the organisation. If you are not the intended recipient,
an agent of the intended recipient or a person responsible for delivering the
information to the named recipient, you are notified that any use,
distribution, transmission, printing, copying or dissemination of this
information in any way or in any manner is strictly prohibited. If you are not
the intended recipient of this mail kindly delete from your system and inform
the sender. There is no guarantee that the integrity of this communication has
been maintained and nor is this communication free of viruses, interceptions or
interference."