This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 8afe1de CAMEL-16222: PooledExchangeFactory experiment
8afe1de is described below
commit 8afe1dec0e9011afb6bd9f9a52fc9e37dfeb0573
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 23 09:17:35 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../camel/component/kafka/KafkaConsumer.java | 36 ++++++++---
.../camel/component/kafka/KafkaEndpoint.java | 21 ------
.../camel/component/kafka/KafkaConsumerTest.java | 1 -
.../camel/component/kafka/KafkaEndpointTest.java | 75 ----------------------
4 files changed, 27 insertions(+), 106 deletions(-)
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index b911881..7078044 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -33,6 +33,7 @@ import java.util.regex.Pattern;
import java.util.stream.StreamSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.spi.HeaderFilterStrategy;
@@ -239,8 +240,7 @@ public class KafkaConsumer extends DefaultConsumer {
@SuppressWarnings("unchecked")
protected boolean doRun() {
- // allow to re-connect thread in case we use that to retry failed
- // messages
+ // allow to re-connect thread in case we use that to retry failed
messages
boolean reConnect = false;
boolean unsubscribing = false;
@@ -320,7 +320,7 @@ public class KafkaConsumer extends DefaultConsumer {
LOG.trace("Partition = {}, offset = {},
key = {}, value = {}", record.partition(),
record.offset(), record.key(),
record.value());
}
- Exchange exchange =
endpoint.createKafkaExchange(record);
+ Exchange exchange =
createKafkaExchange(record);
propagateHeaders(record, exchange,
endpoint.getConfiguration());
@@ -355,14 +355,11 @@ public class KafkaConsumer extends DefaultConsumer {
// processing failed due to an unhandled
// exception, what should we do
if
(endpoint.getConfiguration().isBreakOnFirstError()) {
- // we are failing and we should break
- // out
+ // we are failing and we should break
out
LOG.warn(
"Error during processing {}
from topic: {}. Will seek consumer to offset: {} and re-connect and start
polling again.",
- exchange,
- topicName,
partitionLastOffset, exchange.getException());
- // force commit so we resume on next
- // poll where we failed
+ exchange, topicName,
partitionLastOffset, exchange.getException());
+ // force commit so we resume on next
poll where we failed
commitOffset(offsetRepository,
partition, partitionLastOffset, true);
// continue to next partition
breakOnErrorHit = true;
@@ -380,6 +377,9 @@ public class KafkaConsumer extends DefaultConsumer {
// offset state upon partition revoke
lastProcessedOffset.put(serializeOffsetKey(partition), partitionLastOffset);
}
+
+ // success so release the exchange
+ releaseExchange(exchange, false);
}
if (!breakOnErrorHit) {
@@ -506,6 +506,24 @@ public class KafkaConsumer extends DefaultConsumer {
}
}
+ @SuppressWarnings("rawtypes")
+ private Exchange createKafkaExchange(ConsumerRecord record) {
+ Exchange exchange = createExchange(false);
+
+ Message message = exchange.getIn();
+ message.setHeader(KafkaConstants.PARTITION, record.partition());
+ message.setHeader(KafkaConstants.TOPIC, record.topic());
+ message.setHeader(KafkaConstants.OFFSET, record.offset());
+ message.setHeader(KafkaConstants.HEADERS, record.headers());
+ message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
+ if (record.key() != null) {
+ message.setHeader(KafkaConstants.KEY, record.key());
+ }
+ message.setBody(record.value());
+
+ return exchange;
+ }
+
private void propagateHeaders(
ConsumerRecord<Object, Object> record, Exchange exchange,
KafkaConfiguration kafkaConfiguration) {
HeaderFilterStrategy headerFilterStrategy =
kafkaConfiguration.getHeaderFilterStrategy();
diff --git
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 2ffc47e..950cbfa 100644
---
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -21,8 +21,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -33,7 +31,6 @@ import org.apache.camel.support.DefaultEndpoint;
import org.apache.camel.support.SynchronousDelegateProducer;
import org.apache.camel.util.CastUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Deserializer;
@@ -146,24 +143,6 @@ public class KafkaEndpoint extends DefaultEndpoint
implements MultipleConsumersS
"KafkaProducer[" + configuration.getTopic() + "]", core, max);
}
- @SuppressWarnings("rawtypes")
- public Exchange createKafkaExchange(ConsumerRecord record) {
- Exchange exchange = super.createExchange();
-
- Message message = exchange.getIn();
- message.setHeader(KafkaConstants.PARTITION, record.partition());
- message.setHeader(KafkaConstants.TOPIC, record.topic());
- message.setHeader(KafkaConstants.OFFSET, record.offset());
- message.setHeader(KafkaConstants.HEADERS, record.headers());
- message.setHeader(KafkaConstants.TIMESTAMP, record.timestamp());
- if (record.key() != null) {
- message.setHeader(KafkaConstants.KEY, record.key());
- }
- message.setBody(record.value());
-
- return exchange;
- }
-
protected KafkaProducer createProducer(KafkaEndpoint endpoint) {
return new KafkaProducer(endpoint);
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
index 26188c4..7bad317 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java
@@ -20,7 +20,6 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExchangeFactory;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
deleted file mode 100644
index 0533013..0000000
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaEndpointTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoSettings;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.when;
-
-@MockitoSettings
-public class KafkaEndpointTest {
-
- private KafkaEndpoint endpoint;
-
- @Mock
- private ConsumerRecord<String, String> mockRecord;
-
- @Mock
- private KafkaComponent mockKafkaComponent;
-
- @BeforeEach
- public void setup() {
- KafkaComponent kafka = new KafkaComponent(new DefaultCamelContext());
- kafka.init();
- endpoint = new KafkaEndpoint("kafka:mytopic?brokers=localhost", kafka);
- }
-
- @Test
- public void createKafkaExchangeShouldSetHeaders() {
-
- when(mockRecord.key()).thenReturn("somekey");
- when(mockRecord.topic()).thenReturn("topic");
- when(mockRecord.partition()).thenReturn(4);
- when(mockRecord.offset()).thenReturn(56L);
- when(mockRecord.timestamp()).thenReturn(1518026587392L);
-
- Exchange exchange = endpoint.createKafkaExchange(mockRecord);
- Message inMessage = exchange.getIn();
- assertNotNull(inMessage);
- assertEquals("somekey", inMessage.getHeader(KafkaConstants.KEY));
- assertEquals("topic", inMessage.getHeader(KafkaConstants.TOPIC));
- assertEquals(4, inMessage.getHeader(KafkaConstants.PARTITION));
- assertEquals(56L, inMessage.getHeader(KafkaConstants.OFFSET));
- assertEquals(1518026587392L,
inMessage.getHeader(KafkaConstants.TIMESTAMP));
- }
-
- @Test
- public void isSingletonShouldReturnTrue() {
- assertTrue(endpoint.isSingleton());
- }
-
-}