[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2020-01-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: (was: metadata.patch)

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka2.patch, metadata.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2020-01-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: metadata.patch

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka2.patch, metadata.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2020-01-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: metadata.patch

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka2.patch, metadata.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-12-31 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: kafka2.patch

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka2.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-12-31 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: (was: kafka.patch)

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-12-31 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Attachment: kafka.patch

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Assignee: Omar Al-Safi
>Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka.patch
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-12-03 Thread Claus Ibsen (Jira)


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-14233:

Issue Type: Improvement  (was: Bug)

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Improvement
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Priority: Minor
> Fix For: 3.1.0
>
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-30 Thread Claus Ibsen (Jira)


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-14233:

Fix Version/s: 3.1.0

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Bug
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Priority: Minor
> Fix For: 3.1.0
>
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-30 Thread Claus Ibsen (Jira)


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-14233:

Priority: Minor  (was: Major)

> camel-kafka - topic overriding not possible when using aggregation
> --
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
>  Issue Type: Bug
>  Components: camel-kafka
>Affects Versions: 2.24.2
>Reporter: Rafał Gała
>Priority: Minor
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator createRecorder(Exchange exchange) throws 
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to 
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message 
> comes from: {}"
> + ". Will use endpoint configured topic: {}", 
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator) msg;
> }
> if (iterator != null) {
> final Iterator msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value 
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Description: 
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge then detect if we try to send back to ourselves
// which we most likely do not want to do
if (headerTopic != null && 
endpoint.getConfiguration().isCircularTopicDetection()) {
Endpoint from = exchange.getFromEndpoint();
if (from instanceof KafkaEndpoint) {
String fromTopic = ((KafkaEndpoint) 
from).getConfiguration().getTopic();
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message header."
+ " Cannot send to same topic as the message comes 
from: {}"
+ ". Will use endpoint configured topic: {}", from, 
topic);
}
}
}
if (allowHeader && headerTopic != null) {
topic = headerTopic;
}
}

if (topic == null) {
// if topic property was not received from configuration or header 
parameters take it from the remaining URI
topic = URISupport.extractRemainderPath(new 
URI(endpoint.getEndpointUri()), true);
}

...

Object msg = exchange.getIn().getBody();

// is the message body a list or something that contains multiple values
Iterator iterator = null;
if (msg instanceof Iterable) {
iterator = ((Iterable) msg).iterator();
} else if (msg instanceof Iterator) {
iterator = (Iterator) msg;
}
if (iterator != null) {
final Iterator msgList = iterator;
{code}
The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
next exchange from collection
{code}
final String msgTopic = topic;

return new Iterator() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}

@Override
public ProducerRecord next() {

// must convert each entry of the iterator into the value 
according to the serializer
Object next = msgList.next();
Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());

if (hasPartitionKey && hasMessageKey) {
return new ProducerRecord(msgTopic, partitionKey, null, 
key, value, propagatedHeaders);
} else if (hasMessageKey) {
return new ProducerRecord(msgTopic, null, null, key, value, 
propagatedHeaders);
} else {
return new ProducerRecord(msgTopic, null, null, null, 
value, propagatedHeaders);
}
}

@Override
public void remove() {
msgList.remove();
}
};
}

...
}

{code}
 

  was:
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
 

[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Description: 
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge then detect if we try to send back to ourselves
// which we most likely do not want to do
if (headerTopic != null && 
endpoint.getConfiguration().isCircularTopicDetection()) {
Endpoint from = exchange.getFromEndpoint();
if (from instanceof KafkaEndpoint) {
String fromTopic = ((KafkaEndpoint) 
from).getConfiguration().getTopic();
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message header."
+ " Cannot send to same topic as the message comes 
from: {}"
+ ". Will use endpoint configured topic: {}", from, 
topic);
}
}
}
if (allowHeader && headerTopic != null) {
topic = headerTopic;
}
}

if (topic == null) {
// if topic property was not received from configuration or header 
parameters take it from the remaining URI
topic = URISupport.extractRemainderPath(new 
URI(endpoint.getEndpointUri()), true);
}

...

Object msg = exchange.getIn().getBody();

// is the message body a list or something that contains multiple values
Iterator iterator = null;
if (msg instanceof Iterable) {
iterator = ((Iterable) msg).iterator();
} else if (msg instanceof Iterator) {
iterator = (Iterator) msg;
}
if (iterator != null) {
final Iterator msgList = iterator;

final String msgTopic = topic;

return new Iterator() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}

@Override
public ProducerRecord next() {

// must convert each entry of the iterator into the value 
according to the serializer
Object next = msgList.next();
Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());

if (hasPartitionKey && hasMessageKey) {
return new ProducerRecord(msgTopic, partitionKey, null, 
key, value, propagatedHeaders);
} else if (hasMessageKey) {
return new ProducerRecord(msgTopic, null, null, key, value, 
propagatedHeaders);
} else {
return new ProducerRecord(msgTopic, null, null, null, 
value, propagatedHeaders);
}
}

@Override
public void remove() {
msgList.remove();
}
};
}

...
}

{code}
 

  was:
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge then detect if we try to send back to ourselves
 

[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Description: 
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge then detect if we try to send back to ourselves
// which we most likely do not want to do
if (headerTopic != null && 
endpoint.getConfiguration().isCircularTopicDetection()) {
Endpoint from = exchange.getFromEndpoint();
if (from instanceof KafkaEndpoint) {
String fromTopic = ((KafkaEndpoint) 
from).getConfiguration().getTopic();
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message header."
+ " Cannot send to same topic as the message comes 
from: {}"
+ ". Will use endpoint configured topic: {}", from, 
topic);
}
}
}
if (allowHeader && headerTopic != null) {
topic = headerTopic;
}
}

if (topic == null) {
// if topic property was not received from configuration or header 
parameters take it from the remaining URI
topic = URISupport.extractRemainderPath(new 
URI(endpoint.getEndpointUri()), true);
}

...

Object msg = exchange.getIn().getBody();

// is the message body a list or something that contains multiple values
Iterator iterator = null;
if (msg instanceof Iterable) {
iterator = ((Iterable) msg).iterator();
} else if (msg instanceof Iterator) {
iterator = (Iterator) msg;
}
if (iterator != null) {
final Iterator msgList = iterator;

final String msgTopic = topic;

return new Iterator() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}

@Override
public ProducerRecord next() {
{code}
Maybe this is a good place, to make 
{code{
// must convert each entry of the iterator into the value 
according to the serializer
Object next = msgList.next();
Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());

if (hasPartitionKey && hasMessageKey) {
return new ProducerRecord(msgTopic, partitionKey, null, 
key, value, propagatedHeaders);
} else if (hasMessageKey) {
return new ProducerRecord(msgTopic, null, null, key, value, 
propagatedHeaders);
} else {
return new ProducerRecord(msgTopic, null, null, null, 
value, propagatedHeaders);
}
}

@Override
public void remove() {
msgList.remove();
}
};
}

...
}

{code}
 

  was:
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge 

[jira] [Updated] (CAMEL-14233) camel-kafka - topic overriding not possible when using aggregation

2019-11-29 Thread Jira


 [ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rafał Gała updated CAMEL-14233:
---
Description: 
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {
String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
String.class);
boolean allowHeader = true;

// when we do not bridge then detect if we try to send back to ourselves
// which we most likely do not want to do
if (headerTopic != null && 
endpoint.getConfiguration().isCircularTopicDetection()) {
Endpoint from = exchange.getFromEndpoint();
if (from instanceof KafkaEndpoint) {
String fromTopic = ((KafkaEndpoint) 
from).getConfiguration().getTopic();
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message header."
+ " Cannot send to same topic as the message comes 
from: {}"
+ ". Will use endpoint configured topic: {}", from, 
topic);
}
}
}
if (allowHeader && headerTopic != null) {
topic = headerTopic;
}
}

if (topic == null) {
// if topic property was not received from configuration or header 
parameters take it from the remaining URI
topic = URISupport.extractRemainderPath(new 
URI(endpoint.getEndpointUri()), true);
}

...

Object msg = exchange.getIn().getBody();

// is the message body a list or something that contains multiple values
Iterator iterator = null;
if (msg instanceof Iterable) {
iterator = ((Iterable) msg).iterator();
} else if (msg instanceof Iterator) {
iterator = (Iterator) msg;
}
if (iterator != null) {
final Iterator msgList = iterator;

final String msgTopic = topic;

{code}
Here the topic should be taken from an aggregated Exchange KafkaConstants.TOPIC 
header (it may be different for each from the collection)
{code:java}
return new Iterator() {
@Override
public boolean hasNext() {
return msgList.hasNext();
}

@Override
public ProducerRecord next() {
{code}
Maybe this is a good place, to make 
{code{
// must convert each entry of the iterator into the value 
according to the serializer
Object next = msgList.next();
Object value = tryConvertToSerializedType(exchange, next, 
endpoint.getConfiguration().getSerializerClass());

if (hasPartitionKey && hasMessageKey) {
return new ProducerRecord(msgTopic, partitionKey, null, 
key, value, propagatedHeaders);
} else if (hasMessageKey) {
return new ProducerRecord(msgTopic, null, null, key, value, 
propagatedHeaders);
} else {
return new ProducerRecord(msgTopic, null, null, null, 
value, propagatedHeaders);
}
}

@Override
public void remove() {
msgList.remove();
}
};
}

...
}

{code}
 

  was:
When exchange aggregation is used, using GroupedExchangeAggregationStrategy for 
example:

 
{code:java}
from(..)
.process(some processor here that sets KafkaConstants.TOPIC header here)
.aggregate(new GroupedExchangeAggregationStrategy ())
.to(kafka:...)
{code}
it is not possible to override topic per exchange by using KafkaConstants.TOPIC 
header, because in *createRecord* of KafkaProducer class, the topic is chosen 
from header of aggregating Exchange which may not be set because it may have 
been set only on Exchanges that were aggregated. When creating ProducerRecord 
from Iterable, the topic should be chosen from header of each Exchange 
separately:
{code:java}
protected Iterator createRecorder(Exchange exchange) throws 
Exception {
String topic = endpoint.getConfiguration().getTopic();

if (!endpoint.getConfiguration().isBridgeEndpoint()) {