[
https://issues.apache.org/jira/browse/CAMEL-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536566#comment-16536566
]
ASF GitHub Bot commented on CAMEL-12503:
----------------------------------------
oscerd closed pull request #2410: CAMEL-12503 : support for propagating camel
headers to kafka
URL: https://github.com/apache/camel/pull/2410
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index ac32dca95fd..bc4ed4b24c9 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (90 parameters):
+==== Query Parameters (91 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -80,6 +80,7 @@ with the following path and query parameters:
| Name | Description | Default | Type
| *brokers* (common) | URL of the Kafka brokers to use. The format is
host1:port1,host2:port2, and the list can be a subset of brokers or a VIP
pointing to a subset of brokers. This option is known as bootstrap.servers in
the Kafka documentation. | | String
| *clientId* (common) | The client id is a user-specified string sent in each
request to help trace calls. It should logically identify the application
making the request. | | String
+| *headerFilterStrategy* (common) | To use a custom HeaderFilterStrategy to
filter header to and from Camel message. | | HeaderFilterStrategy
| *reconnectBackoffMaxMs* (common) | The maximum amount of time in
milliseconds to wait when reconnecting to a broker that has repeatedly failed
to connect. If provided, the backoff per host will increase exponentially for
each consecutive connection failure, up to this maximum. After calculating the
backoff increase, 20% random jitter is added to avoid connection storms. | 1000
| Integer
| *allowManualCommit* (consumer) | Whether to allow doing manual commits via
KafkaManualCommit. If this option is enabled then an instance of
KafkaManualCommit is stored on the Exchange message header, which allows end
users to access this API and perform manual offset commits via the Kafka
consumer. | false | boolean
| *autoCommitEnable* (consumer) | If true, periodically commit to ZooKeeper
the offset of messages already fetched by the consumer. This committed offset
will be used when the process fails as the position from which the new consumer
will begin. | true | Boolean
@@ -417,3 +418,25 @@ This will force a synchronous commit which will block
until the commit is acknow
If you want to use a custom implementation of `KafkaManualCommit` then you can
configure a custom `KafkaManualCommitFactory`
on the `KafkaComponent` that creates instances of your custom implementation.
+
+=== Kafka Headers propagation
+*Available as of Camel 2.22*
+
+When consuming messages from Kafka, headers will be propagated to camel
exchange headers automatically.
+Producing flow backed by same behaviour - camel headers of particular exchange
will be propagated to kafka message headers.
+
+Since kafka headers allows only `byte[]` values, in order camel exchnage
header to be propagated its value should be serialized to `bytes[]`,
+otherwise header will be skipped.
+Following header value types are supported: `String`, `Integer`, `Long`,
`Double`, `byte[]`.
+Note: all headers propagated *from* kafka *to* camel exchange will contain
`byte[]` value.
+
+By default all headers are being filtered by `KafkaHeaderFilterStrategy`.
+Strategy filters out headers which start with `Camel` or `org.apache.camel`
prefixes.
+Default strategy can be overridden by using `headerFilterStrategy` uri
parameter in both `to` and `from` routes:
+```
+from("kafka:my_topic?headerFilterStrategy=#myStrategy")
+...
+.to("kafka:my_topic?headerFilterStrategy=#myStrategy")
+```
+
+`myStrategy` object should be subclass of `HeaderFilterStrategy` and must be
placed in the Camel registry, either manually or by registration as a bean in
Spring/Blueprint, as it is `CamelContext` aware.
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 9b5ba6b0a3d..dabd4755689 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -24,6 +24,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spi.UriParam;
@@ -43,15 +45,18 @@
import org.apache.kafka.common.config.SslConfigs;
@UriParams
-public class KafkaConfiguration implements Cloneable {
+public class KafkaConfiguration implements Cloneable,
HeaderFilterStrategyAware {
//Common configuration properties
- @UriPath(label = "common") @Metadata(required = "true")
+ @UriPath(label = "common")
+ @Metadata(required = "true")
private String topic;
@UriParam(label = "common")
private String brokers;
@UriParam(label = "common")
private String clientId;
+ @UriParam(label = "common", description = "To use a custom
HeaderFilterStrategy to filter header to and from Camel message.")
+ private HeaderFilterStrategy headerFilterStrategy = new
KafkaHeaderFilterStrategy();
@UriParam(label = "consumer")
private boolean topicIsPattern;
@@ -294,10 +299,10 @@
private Double kerberosRenewWindowFactor =
SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR;
@UriParam(label = "common,security", defaultValue = "DEFAULT")
//sasl.kerberos.principal.to.local.rules
- private String kerberosPrincipalToLocalRules;
+ private String kerberosPrincipalToLocalRules;
@UriParam(label = "common,security", secret = true)
//sasl.jaas.config
- private String saslJaasConfig;
+ private String saslJaasConfig;
public KafkaConfiguration() {
}
@@ -343,7 +348,7 @@ public Properties createProducerProperties() {
addPropertyIfNotNull(props, ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
getRetryBackoffMs());
addPropertyIfNotNull(props, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
isEnableIdempotence());
addPropertyIfNotNull(props,
ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-
+
// SSL
applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, getSecurityProtocol());
@@ -403,7 +408,7 @@ public Properties createConsumerProperties() {
addPropertyIfNotNull(props,
ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, getReconnectBackoffMs());
addPropertyIfNotNull(props, ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
getRetryBackoffMs());
addPropertyIfNotNull(props,
ConsumerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, getReconnectBackoffMaxMs());
-
+
// SSL
applySslConfiguration(props, getSslContextParameters());
addPropertyIfNotNull(props, SslConfigs.SSL_KEY_PASSWORD_CONFIG,
getSslKeyPassword());
@@ -1029,14 +1034,14 @@ public String getSaslMechanism() {
public void setSaslMechanism(String saslMechanism) {
this.saslMechanism = saslMechanism;
}
-
+
public String getSaslJaasConfig() {
return saslJaasConfig;
}
/**
* Expose the kafka sasl.jaas.config parameter
- *
+ *
* Example:
* org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
*/
@@ -1498,7 +1503,7 @@ public String getSeekTo() {
* Set if KafkaConsumer will read from beginning or end on startup:
* beginning : read from beginning
* end : read from end
- *
+ *
* This is replacing the earlier property seekToBeginning
*/
public void setSeekTo(String seekTo) {
@@ -1559,6 +1564,7 @@ public void setRecordMetadata(boolean recordMetadata) {
public String getInterceptorClasses() {
return interceptorClasses;
}
+
/**
* Sets interceptors for producer or consumers.
* Producer interceptors have to be classes implementing {@link
org.apache.kafka.clients.producer.ProducerInterceptor}
@@ -1596,4 +1602,16 @@ public Integer getReconnectBackoffMaxMs() {
public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from Camel
message.
+ */
+ public void setHeaderFilterStrategy(HeaderFilterStrategy
headerFilterStrategy) {
+ this.headerFilterStrategy = headerFilterStrategy;
+ }
+
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 05111f2ec27..c585e054f90 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,10 +27,12 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;
+import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
@@ -42,6 +44,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.header.Header;
public class KafkaConsumer extends DefaultConsumer {
@@ -98,7 +101,7 @@ Properties getProps() {
@Override
protected void doStart() throws Exception {
log.info("Starting Kafka consumer on topic: {} with breakOnFirstError:
{}",
- endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
+ endpoint.getConfiguration().getTopic(),
endpoint.getConfiguration().isBreakOnFirstError());
super.doStart();
executor = endpoint.createExecutor();
@@ -276,10 +279,12 @@ protected boolean doRun() {
record = recordIterator.next();
if (log.isTraceEnabled()) {
log.trace("Partition = {}, offset = {},
key = {}, value = {}", record.partition(), record.offset(), record.key(),
- record.value());
+ record.value());
}
Exchange exchange =
endpoint.createKafkaExchange(record);
+ propagateHeaders(record, exchange,
endpoint.getConfiguration().getHeaderFilterStrategy());
+
// if not auto commit then we have additional
information on the exchange
if (!isAutoCommitEnabled()) {
exchange.getIn().setHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
!recordIterator.hasNext());
@@ -287,9 +292,9 @@ protected boolean doRun() {
if
(endpoint.getConfiguration().isAllowManualCommit()) {
// allow Camel users to access the Kafka
consumer API to be able to do for example manual commits
KafkaManualCommit manual =
endpoint.getComponent().getKafkaManualCommitFactory().newInstance(exchange,
consumer, topicName, threadId,
- offsetRepository, partition,
record.offset());
+ offsetRepository, partition,
record.offset());
exchange.getIn().setHeader(KafkaConstants.MANUAL_COMMIT, manual);
-
+
}
try {
@@ -303,7 +308,7 @@ protected boolean doRun() {
if
(endpoint.getConfiguration().isBreakOnFirstError()) {
// we are failing and we should break
out
log.warn("Error during processing {}
from topic: {}. Will seek consumer to offset: {} and re-connect and start
polling again.",
- exchange, topicName,
partitionLastOffset);
+ exchange, topicName,
partitionLastOffset);
// force commit so we resume on next
poll where we failed
commitOffset(offsetRepository,
partition, partitionLastOffset, true);
// continue to next partition
@@ -423,6 +428,16 @@ public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
}
+ private void propagateHeaders(ConsumerRecord<Object, Object> record,
Exchange exchange, HeaderFilterStrategy headerFilterStrategy) {
+ StreamSupport.stream(record.headers().spliterator(), false)
+ .filter(header -> shouldBeFiltered(header, exchange,
headerFilterStrategy))
+ .forEach(header -> exchange.getIn().setHeader(header.key(),
header.value()));
+ }
+
+ private boolean shouldBeFiltered(Header header, Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ return !headerFilterStrategy.applyFilterToCamelHeaders(header.key(),
header.value(), exchange);
+ }
+
private boolean isAutoCommitEnabled() {
return endpoint.getConfiguration().isAutoCommitEnable() != null &&
endpoint.getConfiguration().isAutoCommitEnable();
}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
new file mode 100644
index 00000000000..3c55daadb08
--- /dev/null
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaHeaderFilterStrategy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka;
+
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+
+public class KafkaHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+ public KafkaHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ // filter out kafka record metadata
+ getInFilter().add("org.apache.kafka.clients.producer.RecordMetadata");
+
+ // filter headers begin with "Camel" or "org.apache.camel"
+
setOutFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+
setInFilterPattern("(?i)(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*");
+ }
+}
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 3ba06583ab9..e7df82bf44b 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -23,20 +23,26 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.util.URISupport;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.utils.Bytes;
public class KafkaProducer extends DefaultAsyncProducer {
@@ -142,8 +148,8 @@ protected void doStop() throws Exception {
allowHeader = !headerTopic.equals(fromTopic);
if (!allowHeader) {
log.debug("Circular topic detected from message
header."
- + " Cannot send to same topic as the message comes
from: {}"
- + ". Will use endpoint configured topic: {}",
from, topic);
+ + " Cannot send to same topic as the message
comes from: {}"
+ + ". Will use endpoint configured topic: {}",
from, topic);
}
}
}
@@ -159,24 +165,28 @@ protected void doStop() throws Exception {
// endpoint take precedence over header configuration
final Integer partitionKey =
endpoint.getConfiguration().getPartitionKey() != null
- ? endpoint.getConfiguration().getPartitionKey() :
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
+ ? endpoint.getConfiguration().getPartitionKey() :
exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY, Integer.class);
final boolean hasPartitionKey = partitionKey != null;
// endpoint take precedence over header configuration
Object key = endpoint.getConfiguration().getKey() != null
- ? endpoint.getConfiguration().getKey() :
exchange.getIn().getHeader(KafkaConstants.KEY);
+ ? endpoint.getConfiguration().getKey() :
exchange.getIn().getHeader(KafkaConstants.KEY);
final Object messageKey = key != null
- ? tryConvertToSerializedType(exchange, key,
endpoint.getConfiguration().getKeySerializerClass()) : null;
+ ? tryConvertToSerializedType(exchange, key,
endpoint.getConfiguration().getKeySerializerClass()) : null;
final boolean hasMessageKey = messageKey != null;
+ // extracting headers which need to be propagated
+ HeaderFilterStrategy headerFilterStrategy =
endpoint.getConfiguration().getHeaderFilterStrategy();
+ List<Header> propagatedHeaders = getPropagatedHeaders(exchange,
headerFilterStrategy);
+
Object msg = exchange.getIn().getBody();
// is the message body a list or something that contains multiple
values
Iterator<Object> iterator = null;
if (msg instanceof Iterable) {
- iterator = ((Iterable<Object>)msg).iterator();
+ iterator = ((Iterable<Object>) msg).iterator();
} else if (msg instanceof Iterator) {
- iterator = (Iterator<Object>)msg;
+ iterator = (Iterator<Object>) msg;
}
if (iterator != null) {
final Iterator<Object> msgList = iterator;
@@ -194,11 +204,11 @@ public ProducerRecord next() {
Object value = tryConvertToSerializedType(exchange, next,
endpoint.getConfiguration().getSerializerClass());
if (hasPartitionKey && hasMessageKey) {
- return new ProducerRecord(msgTopic, partitionKey, key,
value);
+ return new ProducerRecord(msgTopic, partitionKey,
null, key, value, propagatedHeaders);
} else if (hasMessageKey) {
- return new ProducerRecord(msgTopic, key, value);
+ return new ProducerRecord(msgTopic, null, null, key,
value, propagatedHeaders);
} else {
- return new ProducerRecord(msgTopic, value);
+ return new ProducerRecord(msgTopic, null, null, null,
value, propagatedHeaders);
}
}
@@ -214,15 +224,58 @@ public void remove() {
ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
- record = new ProducerRecord(topic, partitionKey, key, value);
+ record = new ProducerRecord(topic, partitionKey, null, key, value,
propagatedHeaders);
} else if (hasMessageKey) {
- record = new ProducerRecord(topic, key, value);
+ record = new ProducerRecord(topic, null, null, key, value,
propagatedHeaders);
} else {
- record = new ProducerRecord(topic, value);
+ record = new ProducerRecord(topic, null, null, null, value,
propagatedHeaders);
}
return Collections.singletonList(record).iterator();
}
+ private List<Header> getPropagatedHeaders(Exchange exchange,
HeaderFilterStrategy headerFilterStrategy) {
+ return exchange.getIn().getHeaders().entrySet().stream()
+ .filter(entry -> shouldBeFiltered(entry, exchange,
headerFilterStrategy))
+ .map(this::getRecordHeader)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ }
+
+ private boolean shouldBeFiltered(Map.Entry<String, Object> entry, Exchange
exchange, HeaderFilterStrategy headerFilterStrategy) {
+ return
!headerFilterStrategy.applyFilterToExternalHeaders(entry.getKey(),
entry.getValue(), exchange);
+ }
+
+ private RecordHeader getRecordHeader(Map.Entry<String, Object> entry) {
+ byte[] headerValue = getHeaderValue(entry.getValue());
+ if (headerValue == null) {
+ return null;
+ }
+ return new RecordHeader(entry.getKey(), headerValue);
+ }
+
+ private byte[] getHeaderValue(Object value) {
+ if (value instanceof String) {
+ return ((String) value).getBytes();
+ } else if (value instanceof Long) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong((Long) value);
+ return buffer.array();
+ } else if (value instanceof Integer) {
+ ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
+ buffer.putInt((Integer) value);
+ return buffer.array();
+ } else if (value instanceof Double) {
+ ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
+ buffer.putDouble((Double) value);
+ return buffer.array();
+ } else if (value instanceof byte[]) {
+ return (byte[]) value;
+ }
+ log.debug("Cannot propagate header value of type[{}], skipping... " +
+ "Supported types: String, Integer, Long, Double, byte[].",
value != null ? value.getClass() : "null");
+ return null;
+ }
+
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
// Camel calls this method if the endpoint isSynchronous(), as the
KafkaEndpoint creates a SynchronousDelegateProducer for it
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
index d9be4d6796a..d99a98369b8 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerFullTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.StreamSupport;
@@ -25,6 +26,7 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -71,20 +73,30 @@ public void configure() throws Exception {
@Test
public void kafkaMessageIsConsumedByCamel() throws InterruptedException,
IOException {
+ String propagatedHeaderKey = "PropagatedCustomHeader";
+ byte[] propagatedHeaderValue = "propagated header value".getBytes();
+ String skippedHeaderKey = "CamelSkippedHeader";
to.expectedMessageCount(5);
to.expectedBodiesReceivedInAnyOrder("message-0", "message-1",
"message-2", "message-3", "message-4");
// The LAST_RECORD_BEFORE_COMMIT header should not be configured on
any exchange because autoCommitEnable=true
to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT,
null, null, null, null, null);
+ to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
for (int k = 0; k < 5; k++) {
String msg = "message-" + k;
- ProducerRecord<String, String> data = new ProducerRecord<String,
String>(TOPIC, "1", msg);
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC,
"1", msg);
+ data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped
header value".getBytes()));
+ data.headers().add(new RecordHeader(propagatedHeaderKey,
propagatedHeaderValue));
producer.send(data);
}
to.assertIsSatisfied(3000);
assertEquals(5,
StreamSupport.stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(),
false).count());
+
+ Map<String, Object> headers =
to.getExchanges().get(0).getIn().getHeaders();
+ assertFalse("Should not receive skipped header",
headers.containsKey(skippedHeaderKey));
+ assertTrue("Should receive propagated header",
headers.containsKey(propagatedHeaderKey));
}
@Test
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 4086edeb0dd..9e8b8b82752 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,14 +17,18 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
@@ -33,9 +37,14 @@
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultHeaderFilterStrategy;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -48,24 +57,32 @@
private static final String TOPIC_BYTES = "testBytes";
private static final String TOPIC_BYTES_IN_HEADER = "testBytesHeader";
private static final String GROUP_BYTES = "groupStrings";
+ private static final String TOPIC_PROPAGATED_HEADERS =
"testPropagatedHeaders";
private static KafkaConsumer<String, String> stringsConsumerConn;
private static KafkaConsumer<byte[], byte[]> bytesConsumerConn;
@EndpointInject(uri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1")
private Endpoint toStrings;
+
@EndpointInject(uri = "kafka:" + TOPIC_STRINGS +
"?requestRequiredAcks=-1&partitionKey=1")
private Endpoint toStrings2;
+
@EndpointInject(uri = "kafka:" + TOPIC_INTERCEPTED +
"?requestRequiredAcks=-1"
+
"&interceptorClasses=org.apache.camel.component.kafka.MockProducerInterceptor")
private Endpoint toStringsWithInterceptor;
+
@EndpointInject(uri = "mock:kafkaAck")
private MockEndpoint mockEndpoint;
+
@EndpointInject(uri = "kafka:" + TOPIC_BYTES + "?requestRequiredAcks=-1"
+
"&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+
"keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
private Endpoint toBytes;
+ @EndpointInject(uri = "kafka:" + TOPIC_PROPAGATED_HEADERS +
"?requestRequiredAcks=-1")
+ private Endpoint toPropagatedHeaders;
+
@Produce(uri = "direct:startStrings")
private ProducerTemplate stringsTemplate;
@@ -78,6 +95,16 @@
@Produce(uri = "direct:startTraced")
private ProducerTemplate interceptedTemplate;
+ @Produce(uri = "direct:propagatedHeaders")
+ private ProducerTemplate propagatedHeadersTemplate;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("myStrategy", new MyHeaderFilterStrategy());
+ return jndi;
+ }
+
@BeforeClass
public static void before() {
Properties stringsProps = new Properties();
@@ -118,6 +145,8 @@ public void configure() throws Exception {
from("direct:startBytes").to(toBytes).to(mockEndpoint);
from("direct:startTraced").to(toStringsWithInterceptor).to(mockEndpoint);
+
+
from("direct:propagatedHeaders").to(toPropagatedHeaders).to(mockEndpoint);
}
};
}
@@ -271,6 +300,89 @@ public void producedBytesMessageIsReceivedByKafka() throws
InterruptedException,
}
}
+ @Test
+ public void propagatedHeaderIsReceivedByKafka() throws Exception {
+ String propagatedStringHeaderKey = "PROPAGATED_STRING_HEADER";
+ String propagatedStringHeaderValue = "propagated string header value";
+
+ String propagatedIntegerHeaderKey = "PROPAGATED_INTEGER_HEADER";
+ Integer propagatedIntegerHeaderValue = 54545;
+
+ String propagatedLongHeaderKey = "PROPAGATED_LONG_HEADER";
+ Long propagatedLongHeaderValue = 5454545454545L;
+
+ String propagatedDoubleHeaderKey = "PROPAGATED_DOUBLE_HEADER";
+ Double propagatedDoubleHeaderValue = 43434.545D;
+
+ String propagatedBytesHeaderKey = "PROPAGATED_BYTES_HEADER";
+ byte[] propagatedBytesHeaderValue = new byte[]{121, 34, 34, 54, 5, 3,
54, -34};
+
+ Map<String, Object> camelHeaders = new HashMap<>();
+ camelHeaders.put(propagatedStringHeaderKey,
propagatedStringHeaderValue);
+ camelHeaders.put(propagatedIntegerHeaderKey,
propagatedIntegerHeaderValue);
+ camelHeaders.put(propagatedLongHeaderKey, propagatedLongHeaderValue);
+ camelHeaders.put(propagatedDoubleHeaderKey,
propagatedDoubleHeaderValue);
+ camelHeaders.put(propagatedBytesHeaderKey, propagatedBytesHeaderValue);
+ camelHeaders.put("CustomObjectHeader", new Object());
+ camelHeaders.put("CamelFilteredHeader", "CamelFilteredHeader value");
+
+ CountDownLatch messagesLatch = new CountDownLatch(1);
+ propagatedHeadersTemplate.sendBodyAndHeaders("Some test message",
camelHeaders);
+
+ List<ConsumerRecord<String, String>> records =
pollForRecords(stringsConsumerConn, TOPIC_PROPAGATED_HEADERS, messagesLatch);
+ boolean allMessagesReceived = messagesLatch.await(10_000,
TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics. Not
received: " + messagesLatch.getCount(), allMessagesReceived);
+
+ ConsumerRecord<String, String> record = records.get(0);
+ Headers headers = record.headers();
+ assertNotNull("Kafka Headers should not be null.", headers);
+ // we have 5 headers and 1 header with breadcrumbId
+ assertEquals("Six propagated headers are expected.", 6,
headers.toArray().length);
+ assertEquals("Propagated string value received",
propagatedStringHeaderValue,
+ new String(getHeaderValue(propagatedStringHeaderKey,
headers)));
+ assertEquals("Propagated integer value received",
propagatedIntegerHeaderValue,
+ new
Integer(ByteBuffer.wrap(getHeaderValue(propagatedIntegerHeaderKey,
headers)).getInt()));
+ assertEquals("Propagated long value received",
propagatedLongHeaderValue,
+ new
Long(ByteBuffer.wrap(getHeaderValue(propagatedLongHeaderKey,
headers)).getLong()));
+ assertEquals("Propagated double value received",
propagatedDoubleHeaderValue,
+ new
Double(ByteBuffer.wrap(getHeaderValue(propagatedDoubleHeaderKey,
headers)).getDouble()));
+ assertArrayEquals("Propagated byte array value received",
propagatedBytesHeaderValue, getHeaderValue(propagatedBytesHeaderKey, headers));
+ }
+
+ @Test
+ public void headerFilterStrategyCouldBeOverridden() {
+ KafkaEndpoint kafkaEndpoint =
context.getEndpoint("kafka:TOPIC_PROPAGATED_HEADERS?headerFilterStrategy=#myStrategy",
KafkaEndpoint.class);
+ assertIsInstanceOf(MyHeaderFilterStrategy.class,
kafkaEndpoint.getConfiguration().getHeaderFilterStrategy());
+ }
+
+ private byte[] getHeaderValue(String headerKey, Headers headers) {
+ Header foundHeader = StreamSupport.stream(headers.spliterator(), false)
+ .filter(header -> header.key().equals(headerKey))
+ .findFirst()
+ .orElse(null);
+ assertNotNull("Header should be sent", foundHeader);
+ return foundHeader.value();
+ }
+
+ private List<ConsumerRecord<String, String>>
pollForRecords(KafkaConsumer<String, String> consumerConn,
+ String topic,
CountDownLatch messagesLatch) {
+
+ List<ConsumerRecord<String, String>> consumedRecords = new
ArrayList<>();
+ consumerConn.subscribe(Collections.singletonList(topic));
+
+ new Thread(() -> {
+ while (messagesLatch.getCount() != 0) {
+ for (ConsumerRecord<String, String> record :
consumerConn.poll(100)) {
+ consumedRecords.add(record);
+ messagesLatch.countDown();
+ }
+ }
+ }).start();
+
+ return consumedRecords;
+ }
+
private void createKafkaMessageConsumer(KafkaConsumer<String, String>
consumerConn,
String topic, String
topicInHeader, CountDownLatch messagesLatch) {
@@ -323,4 +435,7 @@ private void sendMessagesInRoute(int messages,
ProducerTemplate template, Object
}
}
+ private static class MyHeaderFilterStrategy extends
DefaultHeaderFilterStrategy {
+ }
+
}
diff --git
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index f5624b1f0f8..e46ad412f79 100644
---
a/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -19,6 +19,7 @@
import java.util.concurrent.ExecutorService;
import javax.annotation.Generated;
import org.apache.camel.component.kafka.KafkaManualCommitFactory;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.StateRepository;
import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
import org.apache.camel.util.jsse.SSLContextParameters;
@@ -751,6 +752,11 @@ public void setResolvePropertyPlaceholders(
* increase, 20% random jitter is added to avoid connection storms.
*/
private Integer reconnectBackoffMaxMs = 1000;
+ /**
+ * To use a custom HeaderFilterStrategy to filter header to and from
+ * Camel message.
+ */
+ private HeaderFilterStrategy headerFilterStrategy;
public Boolean getTopicIsPattern() {
return topicIsPattern;
@@ -1452,5 +1458,14 @@ public Integer getReconnectBackoffMaxMs() {
public void setReconnectBackoffMaxMs(Integer reconnectBackoffMaxMs) {
this.reconnectBackoffMaxMs = reconnectBackoffMaxMs;
}
+
+ public HeaderFilterStrategy getHeaderFilterStrategy() {
+ return headerFilterStrategy;
+ }
+
+ public void setHeaderFilterStrategy(
+ HeaderFilterStrategy headerFilterStrategy) {
+ this.headerFilterStrategy = headerFilterStrategy;
+ }
}
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Kafka component should be able to propagate camel headers to kafka
> ------------------------------------------------------------------
>
> Key: CAMEL-12503
> URL: https://issues.apache.org/jira/browse/CAMEL-12503
> Project: Camel
> Issue Type: New Feature
> Components: camel-kafka
> Reporter: Taras Danylchuk
> Assignee: Claus Ibsen
> Priority: Major
> Fix For: 2.22.0
>
>
> Since 0.11.0 Kafka support headers, and it would be awesome to have such
> feature available also in camel component.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)