[ https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen updated CAMEL-14233: -------------------------------- Issue Type: Improvement (was: Bug) > camel-kafka - topic overriding not possible when using aggregation > ------------------------------------------------------------------ > > Key: CAMEL-14233 > URL: https://issues.apache.org/jira/browse/CAMEL-14233 > Project: Camel > Issue Type: Improvement > Components: camel-kafka > Affects Versions: 2.24.2 > Reporter: Rafał Gała > Priority: Minor > Fix For: 3.1.0 > > > When exchange aggregation is used, using GroupedExchangeAggregationStrategy > for example: > > {code:java} > from(..) > .process(some processor here that sets KafkaConstants.TOPIC header here) > .aggregate(new GroupedExchangeAggregationStrategy ()) > .to(kafka:...) > {code} > it is not possible to override topic per exchange by using > KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer > class, the topic is chosen from header of aggregating Exchange which may not > be set because it may have been set only on Exchanges that were aggregated. > When creating ProducerRecord from Iterable, the topic should be chosen from > header of each Exchange separately: > {code:java} > protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws > Exception { > String topic = endpoint.getConfiguration().getTopic(); > if (!endpoint.getConfiguration().isBridgeEndpoint()) { > String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, > String.class); > boolean allowHeader = true; > // when we do not bridge then detect if we try to send back to > ourselves > // which we most likely do not want to do > if (headerTopic != null && > endpoint.getConfiguration().isCircularTopicDetection()) { > Endpoint from = exchange.getFromEndpoint(); > if (from instanceof KafkaEndpoint) { > String fromTopic = ((KafkaEndpoint) > from).getConfiguration().getTopic(); > allowHeader = !headerTopic.equals(fromTopic); > if (!allowHeader) { > log.debug("Circular topic detected from message header." > + " Cannot send to same topic as the message > comes from: {}" > + ". Will use endpoint configured topic: {}", > from, topic); > } > } > } > if (allowHeader && headerTopic != null) { > topic = headerTopic; > } > } > if (topic == null) { > // if topic property was not received from configuration or header > parameters take it from the remaining URI > topic = URISupport.extractRemainderPath(new > URI(endpoint.getEndpointUri()), true); > } > ... > Object msg = exchange.getIn().getBody(); > // is the message body a list or something that contains multiple values > Iterator<Object> iterator = null; > if (msg instanceof Iterable) { > iterator = ((Iterable<Object>) msg).iterator(); > } else if (msg instanceof Iterator) { > iterator = (Iterator<Object>) msg; > } > if (iterator != null) { > final Iterator<Object> msgList = iterator; > {code} > The msgTopic variable below should be set from KafkaConstants.TOPIC header of > next exchange from collection > {code} > final String msgTopic = topic; > return new Iterator<ProducerRecord>() { > @Override > public boolean hasNext() { > return msgList.hasNext(); > } > @Override > public ProducerRecord next() { > // must convert each entry of the iterator into the value > according to the serializer > Object next = msgList.next(); > Object value = tryConvertToSerializedType(exchange, next, > endpoint.getConfiguration().getSerializerClass()); > if (hasPartitionKey && hasMessageKey) { > return new ProducerRecord(msgTopic, partitionKey, null, > key, value, propagatedHeaders); > } else if (hasMessageKey) { > return new ProducerRecord(msgTopic, null, null, key, > value, propagatedHeaders); > } else { > return new ProducerRecord(msgTopic, null, null, null, > value, propagatedHeaders); > } > } > @Override > public void remove() { > msgList.remove(); > } > }; > } > ... > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)