Hallo,
I’m trying to send a Kafka record to RabbitMQ with headers. I have added 2
headers to the Kafka record see function below.
public Boolean writeToTopic(ArrayList<BundleRecord> listOfKafkaRecords, String
bootstrapServer, String topicName){
Producer producer = createKafkaProducer(bootstrapServer);
listOfKafkaRecords.forEach((record) -> {
ProducerRecord recordKafka = new ProducerRecord(topicName, null,
record);
recordKafka.headers().add(new
RecordHeader("type","MyMessage".getBytes()));
recordKafka.headers().add(new
RecordHeader("content_type","text/plain".getBytes()));
producer.send(recordKafka);
});
producer.close();
return true;
}
The headers are visible in the Kafka topic see image below.
[cid:[email protected]]
When adding a CAMEL-RABBITMQ-KAFKA-CONNECTOR the headers are lost in RabbitMQ
see image below.
[cid:[email protected]]
My camel connector set up is:
{
"name": "rabbitmq-test-connector",
"config": {
"connector.class" :
"org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSinkConnector",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics": "topicname",
"camel.component.rabbitmq.hostname" : "rabbitmq-test",
"camel.component.rabbitmq.portnumber" : 5672,
"camel.component.rabbitmq.username" : "username",
"camel.component.rabbitmq.password": "password",
"camel.sink.path.exchangeName": "sinkExchange",
"camel.sink.endpoint.exchangeType" :"topic",
"camel.sink.endpoint.autoDelete" : "false",
"camel.sink.endpoint.queue" : "endpointqueue",
"camel.sink.endpoint.routingKey" : "key",
"camel.sink.endpoint.vhost": "vhost"
}
}
My question is: Why are the headers not available in RabbitMQ and how do I have
to add to get the headers in RabbitMQ?
Kinds regards,
Tundzaj