This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 59046fe140f CAMEL-18688: add full resume support for Kafka 59046fe140f is described below commit 59046fe140ff0278dd5a243a4b61e4220b3eee20 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Nov 4 16:07:40 2022 +0100 CAMEL-18688: add full resume support for Kafka --- .../org/apache/camel/kafka-adapter-factory | 2 + .../camel/component/kafka/KafkaConsumer.java | 5 + .../camel/component/kafka/KafkaFetchRecords.java | 19 +- .../consumer/support/ResumeStrategyFactory.java | 92 -------- .../support/classic/AssignmentAdapterHelper.java | 49 +++++ .../ClassicRebalanceListener.java} | 31 +-- .../classic/NoOpPartitionAssignmentAdapter.java | 38 ++++ .../OffsetPartitionAssignmentAdapter.java} | 16 +- .../PartitionAssignmentAdapter.java} | 13 +- .../SeekPolicyPartitionAssignmentAdapter.java} | 15 +- .../consumer/support/resume/KafkaResumable.java | 62 ++++++ .../support/resume/KafkaResumeAdapter.java | 113 ++++++++++ .../ResumeRebalanceListener.java} | 32 ++- .../kafka/SingleNodeKafkaResumeStrategy.java | 5 +- ...KafkaConsumerAutoInstResumeRouteStrategyIT.java | 113 ++++++++++ .../KafkaConsumerWithResumeRouteStrategyIT.java | 239 --------------------- .../processor/resume/TransientResumeStrategy.java | 113 +++++----- 17 files changed, 493 insertions(+), 464 deletions(-) diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory new file mode 100644 index 00000000000..10d19f83eab --- /dev/null +++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.component.kafka.consumer.support.resume.KafkaResumeAdapter diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index e1e29660b5c..dad29c73ad5 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -238,4 +238,9 @@ public class KafkaConsumer extends DefaultConsumer public List<TaskHealthState> healthStates() { return tasks.stream().map(t -> t.healthState()).collect(Collectors.toList()); } + + @Override + public String adapterFactoryService() { + return "kafka-adapter-factory"; + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index f94e3e118a5..b31a3898b5a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -27,11 +27,10 @@ import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.CommitManagers; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener; import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies; -import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade; -import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; import org.apache.camel.component.kafka.consumer.support.ProcessingResult; -import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory; +import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener; +import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener; import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler; import org.apache.camel.support.task.ForegroundTask; import org.apache.camel.support.task.Tasks; @@ -40,6 +39,7 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; @@ -272,11 +272,16 @@ public class KafkaFetchRecords implements Runnable { } private void subscribe() { - KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer); - resumeStrategy.setConsumer(consumer); + ConsumerRebalanceListener listener; - PartitionAssignmentListener listener = new PartitionAssignmentListener( - threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, resumeStrategy); + if (kafkaConsumer.getResumeStrategy() == null) { + listener = new ClassicRebalanceListener( + threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, consumer); + } else { + listener = new ResumeRebalanceListener( + threadId, kafkaConsumer.getEndpoint().getConfiguration(), + commitManager, consumer, kafkaConsumer.getResumeStrategy()); + } if (LOG.isInfoEnabled()) { LOG.info("Subscribing {} to {}", threadId, getPrintableTopic()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java deleted file mode 100644 index 77315f2e85b..00000000000 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.consumer.support; - -import org.apache.camel.component.kafka.KafkaConfiguration; -import org.apache.camel.component.kafka.KafkaConsumer; -import org.apache.camel.component.kafka.SeekPolicy; -import org.apache.camel.resume.ResumeStrategy; -import org.apache.camel.spi.StateRepository; -import org.apache.kafka.clients.consumer.Consumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public final class ResumeStrategyFactory { - - /** - * A NO-OP resume strategy that does nothing (i.e.: no resume) - */ - private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter { - - @SuppressWarnings("unused") - @Override - public void setConsumer(Consumer<?, ?> consumer) { - // NO-OP - } - - @Override - public void setKafkaResumable(KafkaResumable kafkaResumable) { - // NO-OP - } - - @SuppressWarnings("unused") - @Override - public void resume() { - - } - } - - private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter(); - private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class); - - private ResumeStrategyFactory() { - } - - public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) { - // When using resumable routes, which register the strategy via service, it takes priority over everything else - ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy(); - if (resumeStrategy != null) { - KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class); - - // The strategy should not be able to be created without an adapter, but let's be safe - assert adapter != null; - - return adapter; - } - - KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration(); - - return resolveBuiltinResumeAdapters(configuration); - } - - private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) { - LOG.debug("No resume strategy was provided ... checking for built-ins ..."); - StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); - SeekPolicy seekTo = configuration.getSeekTo(); - - if (offsetRepository != null) { - LOG.info("Using resume from offset strategy"); - return new OffsetKafkaConsumerResumeAdapter(offsetRepository); - } else if (seekTo != null) { - LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo); - return new SeekPolicyKafkaConsumerResumeAdapter(seekTo); - } - - LOG.info("Using NO-OP resume strategy"); - return NO_OP_RESUME_STRATEGY; - } -} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java new file mode 100644 index 00000000000..082e53bdfef --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.consumer.support.classic; + +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.SeekPolicy; +import org.apache.camel.spi.StateRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class AssignmentAdapterHelper { + + private static final NoOpPartitionAssignmentAdapter NO_OP_ASSIGNMENT_ADAPTER = new NoOpPartitionAssignmentAdapter(); + private static final Logger LOG = LoggerFactory.getLogger(AssignmentAdapterHelper.class); + + private AssignmentAdapterHelper() { + } + + public static PartitionAssignmentAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) { + LOG.debug("No resume strategy was provided ... checking for built-ins ..."); + StateRepository<String, String> offsetRepository = configuration.getOffsetRepository(); + SeekPolicy seekTo = configuration.getSeekTo(); + + if (offsetRepository != null) { + LOG.info("Using resume from offset strategy"); + return new OffsetPartitionAssignmentAdapter(offsetRepository); + } else if (seekTo != null) { + LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo); + return new SeekPolicyPartitionAssignmentAdapter(seekTo); + } + + LOG.info("Using NO-OP resume strategy"); + return NO_OP_ASSIGNMENT_ADAPTER; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java similarity index 67% copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java index 03046f7a10b..ba11d1a7657 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java @@ -15,34 +15,34 @@ * limitations under the License. */ -package org.apache.camel.component.kafka.consumer.support; +package org.apache.camel.component.kafka.consumer.support.classic; import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.consumer.CommitManager; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PartitionAssignmentListener implements ConsumerRebalanceListener { - private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class); +public class ClassicRebalanceListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class); private final String threadId; private final KafkaConfiguration configuration; - private final KafkaConsumerResumeAdapter resumeStrategy; + private final PartitionAssignmentAdapter assignmentAdapter; private final CommitManager commitManager; - public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, - CommitManager commitManager, - KafkaConsumerResumeAdapter resumeStrategy) { + public ClassicRebalanceListener(String threadId, KafkaConfiguration configuration, + CommitManager commitManager, Consumer<?, ?> consumer) { this.threadId = threadId; this.configuration = configuration; this.commitManager = commitManager; - this.resumeStrategy = resumeStrategy; + + assignmentAdapter = AssignmentAdapterHelper.resolveBuiltinResumeAdapters(configuration); + assignmentAdapter.setConsumer(consumer); } @Override @@ -59,19 +59,10 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - if (LOG.isDebugEnabled()) { partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic())); - } - List<KafkaResumable> resumables = partitions.stream() - .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList()); - - resumables.forEach(this::doResume); - } - private void doResume(KafkaResumable r) { - resumeStrategy.setKafkaResumable(r); - resumeStrategy.resume(); + assignmentAdapter.handlePartitionAssignment(); } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java new file mode 100644 index 00000000000..b97f7689e82 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support.classic; + +import org.apache.kafka.clients.consumer.Consumer; + +/** + * A NO-OP resume strategy that does nothing (i.e.: no resume) + */ +public class NoOpPartitionAssignmentAdapter implements PartitionAssignmentAdapter { + + @SuppressWarnings("unused") + @Override + public void setConsumer(Consumer<?, ?> consumer) { + // NO-OP + } + + @SuppressWarnings("unused") + @Override + public void handlePartitionAssignment() { + + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java similarity index 86% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java index 3ef4f209c43..4227ad7f004 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kafka.consumer.support; +package org.apache.camel.component.kafka.consumer.support.classic; import java.util.Set; @@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce /** * A resume strategy that uses Kafka's offset for resuming */ -public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter { +public class OffsetPartitionAssignmentAdapter implements PartitionAssignmentAdapter { - private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class); + private static final Logger LOG = LoggerFactory.getLogger(OffsetPartitionAssignmentAdapter.class); private final StateRepository<String, String> offsetRepository; private Consumer<?, ?> consumer; - public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) { + public OffsetPartitionAssignmentAdapter(StateRepository<String, String> offsetRepository) { this.offsetRepository = offsetRepository; } @@ -46,11 +46,6 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap this.consumer = consumer; } - @Override - public void setKafkaResumable(KafkaResumable kafkaResumable) { - // NO-OP - } - private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) { // The state contains the last read offset, so you need to seek from the next one long offset = deserializeOffsetValue(offsetState) + 1; @@ -58,8 +53,7 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap consumer.seek(topicPartition, offset); } - @Override - public void resume() { + public void handlePartitionAssignment() { Set<TopicPartition> assignments = consumer.assignment(); for (TopicPartition topicPartition : assignments) { String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition)); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java similarity index 75% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java index d25c05662cb..3c6a92d2542 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kafka.consumer.support; +package org.apache.camel.component.kafka.consumer.support.classic; -import org.apache.camel.resume.ResumeAdapter; import org.apache.kafka.clients.consumer.Consumer; /** @@ -27,7 +26,7 @@ import org.apache.kafka.clients.consumer.Consumer; * the component is set up to use more than one of them. As such, implementations are responsible for ensuring the * thread-safety of the operations within the resume method. */ -public interface KafkaConsumerResumeAdapter extends ResumeAdapter { +public interface PartitionAssignmentAdapter { /** * Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent @@ -37,11 +36,5 @@ public interface KafkaConsumerResumeAdapter extends ResumeAdapter { */ void setConsumer(Consumer<?, ?> consumer); - /** - * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed - * not to be null and safe to ignore if partition and topic information are not used. - * - * @param kafkaResumable the resumable instance - */ - void setKafkaResumable(KafkaResumable kafkaResumable); + void handlePartitionAssignment(); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java similarity index 81% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java index 0dee4e1b849..b97839199d6 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.component.kafka.consumer.support; +package org.apache.camel.component.kafka.consumer.support.classic; import org.apache.camel.component.kafka.SeekPolicy; import org.apache.kafka.clients.consumer.Consumer; @@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory; /** * A resume strategy that uses Camel's seekTo configuration for resuming */ -public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter { +public class SeekPolicyPartitionAssignmentAdapter implements PartitionAssignmentAdapter { - private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class); + private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyPartitionAssignmentAdapter.class); private final SeekPolicy seekPolicy; private Consumer<?, ?> consumer; - public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) { + public SeekPolicyPartitionAssignmentAdapter(SeekPolicy seekPolicy) { this.seekPolicy = seekPolicy; } @@ -41,12 +41,7 @@ public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResume } @Override - public void setKafkaResumable(KafkaResumable kafkaResumable) { - // NO-OP - } - - @Override - public void resume() { + public void handlePartitionAssignment() { if (seekPolicy == SeekPolicy.BEGINNING) { LOG.debug("Seeking from the beginning of topic"); consumer.seekToBeginning(consumer.assignment()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java new file mode 100644 index 00000000000..bcbcf578d20 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.component.kafka.KafkaConstants; +import org.apache.camel.resume.Offset; +import org.apache.camel.resume.OffsetKey; +import org.apache.camel.resume.Resumable; +import org.apache.camel.support.resume.OffsetKeys; +import org.apache.camel.support.resume.Offsets; + +public final class KafkaResumable implements Resumable { + private final String addressable; + private final Long offset; + + private KafkaResumable(String addressable, Long offset) { + this.addressable = addressable; + this.offset = offset; + } + + @Override + public OffsetKey<?> getOffsetKey() { + return OffsetKeys.unmodifiableOf(addressable); + } + + @Override + public Offset<?> getLastOffset() { + return Offsets.of(offset); + } + + /** + * Creates a new resumable for Kafka + * + * @param exchange the exchange to create the resumable from + * @return a new KafkaResumable instance with the data from the exchange + */ + public static KafkaResumable of(Exchange exchange) { + String topic = exchange.getMessage().getHeader(KafkaConstants.TOPIC, String.class); + Integer partition = exchange.getMessage().getHeader(KafkaConstants.PARTITION, Integer.class); + Long offset = exchange.getMessage().getHeader(KafkaConstants.OFFSET, Long.class); + + String topicPartition = topic + "/" + partition; + + return new KafkaResumable(topicPartition, offset); + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java new file mode 100644 index 00000000000..641e5b5af06 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.kafka.consumer.support.resume; + +import java.nio.ByteBuffer; + +import org.apache.camel.resume.Cacheable; +import org.apache.camel.resume.Deserializable; +import org.apache.camel.resume.Offset; +import org.apache.camel.resume.OffsetKey; +import org.apache.camel.resume.ResumeAdapter; +import org.apache.camel.resume.cache.ResumeCache; +import org.apache.camel.spi.annotations.JdkService; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@JdkService("kafka-adapter-factory") +public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cacheable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeAdapter.class); + + private Consumer<?, ?> consumer; + private ResumeCache<TopicPartition> resumeCache; + + private boolean resume(TopicPartition topicPartition, Object value) { + consumer.seek(topicPartition, (Long) value); + + return true; + } + + @Override + public void resume() { + resumeCache.forEach(this::resume); + } + + @Override + public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) { + Object keyObj = deserializeKey(keyBuffer); + Object valueObj = deserializeValue(valueBuffer); + + if (keyObj instanceof String) { + String key = (String) keyObj; + + final String[] keyParts = key.split("/"); + if (keyParts == null || keyParts.length != 2) { + + String topic = keyParts[0]; + int partition = Integer.parseInt(keyParts[1]); + + if (valueObj instanceof Long) { + Long offset = (Long) valueObj; + + resumeCache.add(new TopicPartition(topic, partition), offset); + } else { + LOG.warn("The type for the key '{}' is invalid: {}", key, valueObj); + } + + } else { + LOG.warn("Unable to deserialize key '{}' because it has in invalid format and it will be discarded", + key); + } + } else { + LOG.warn("Unable to deserialize key '{}' because its type is invalid", keyObj); + } + + return false; + } + + @Override + public boolean add(OffsetKey<?> key, Offset<?> offset) { + Object keyObj = key.getValue(); + Long valueObject = offset.getValue(Long.class); + + if (keyObj instanceof TopicPartition) { + TopicPartition topicPartition = (TopicPartition) keyObj; + + resumeCache.add(topicPartition, valueObject); + } + + return true; + } + + @SuppressWarnings("unchecked") + @Override + public void setCache(ResumeCache<?> cache) { + this.resumeCache = (ResumeCache<TopicPartition>) cache; + } + + @Override + public ResumeCache<?> getCache() { + return resumeCache; + } + + public void setConsumer(Consumer<?, ?> consumer) { + this.consumer = consumer; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java similarity index 67% rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java index 03046f7a10b..a7a47b72fa1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java @@ -15,34 +15,36 @@ * limitations under the License. */ -package org.apache.camel.component.kafka.consumer.support; +package org.apache.camel.component.kafka.consumer.support.resume; import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.consumer.CommitManager; +import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener; +import org.apache.camel.resume.ResumeStrategy; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PartitionAssignmentListener implements ConsumerRebalanceListener { - private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class); +public class ResumeRebalanceListener implements ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class); private final String threadId; private final KafkaConfiguration configuration; - private final KafkaConsumerResumeAdapter resumeStrategy; private final CommitManager commitManager; + private final KafkaResumeAdapter resumeAdapter; - public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, - CommitManager commitManager, - KafkaConsumerResumeAdapter resumeStrategy) { + public ResumeRebalanceListener(String threadId, KafkaConfiguration configuration, + CommitManager commitManager, Consumer<?, ?> consumer, ResumeStrategy resumeStrategy) { this.threadId = threadId; this.configuration = configuration; this.commitManager = commitManager; - this.resumeStrategy = resumeStrategy; + + resumeAdapter = resumeStrategy.getAdapter(KafkaResumeAdapter.class); + resumeAdapter.setConsumer(consumer); } @Override @@ -59,19 +61,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - if (LOG.isDebugEnabled()) { partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic())); - } - List<KafkaResumable> resumables = partitions.stream() - .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList()); - resumables.forEach(this::doResume); + resumeAdapter.resume(); } - private void doResume(KafkaResumable r) { - resumeStrategy.setKafkaResumable(r); - resumeStrategy.resume(); - } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index c3863b6508e..32ee0a4af97 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -346,7 +346,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy { @Override public ResumeAdapter getAdapter() { - waitForInitialization(); + if (adapter == null) { + waitForInitialization(); + } + return adapter; } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java new file mode 100644 index 00000000000..17a69566ab2 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.consumer.support.resume.KafkaResumable; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.resume.TransientResumeStrategy; +import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KafkaConsumerAutoInstResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAutoInstResumeRouteStrategyIT.class); + private static final String TOPIC = "resumable-route-auto"; + + @EndpointInject("mock:result") + private MockEndpoint result; + + public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrategyConfigurationBuilder() { + return KafkaResumeStrategyConfigurationBuilder.newBuilder() + .withBootstrapServers(service.getBootstrapServers()) + .withTopic("resumable-route-auto-offsets") + .withResumeCache(TransientResumeStrategy.createSimpleCache()) + .withProducerProperty("max.block.ms", "10000") + .withMaxInitializationDuration(Duration.ofSeconds(5)) + .withProducerProperty("delivery.timeout.ms", "30000") + .withProducerProperty("session.timeout.ms", "15000") + .withProducerProperty("request.timeout.ms", "15000") + .withConsumerProperty("session.timeout.ms", "20000"); + } + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + KafkaProducer<Object, Object> producer = new KafkaProducer<>(props); + + for (int i = 0; i < 10; i++) { + producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + } + } + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + } + + @Test + @Timeout(value = 30) + public void testOffsetIsBeingChecked() throws InterruptedException { + MockEndpoint mock = getMockEndpoint("mock:result"); + + mock.expectedMessageCount(10); + mock.assertIsSatisfied(); + } + + @AfterEach + public void after() { + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + + public void process(Exchange exchange) { + exchange.getMessage().setHeader(Exchange.OFFSET, KafkaResumable.of(exchange)); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000" + + "&autoOffsetReset=earliest&consumersCount=1", TOPIC, TOPIC) + .resumable().configuration(getDefaultKafkaResumeStrategyConfigurationBuilder()) + .process(e -> process(e)) + .routeId("resume-strategy-auto-route") + // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it + // .process(e -> e.setException(new RuntimeCamelException("Mock error in test"))) + .to("mock:sentMessages"); + + fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000", "resumable-route-auto-offsets", + "resumable-route-auto-offsets") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java deleted file mode 100644 index c6d903cd546..00000000000 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.integration; - -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import org.apache.camel.BindToRegistry; -import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter; -import org.apache.camel.component.kafka.consumer.support.KafkaResumable; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.processor.resume.TransientResumeStrategy; -import org.apache.camel.resume.Offset; -import org.apache.camel.resume.OffsetKey; -import org.apache.camel.resume.Resumable; -import org.apache.camel.resume.ResumeAdapter; -import org.apache.camel.support.resume.Resumables; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport { - private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class); - private static final String TOPIC = "resumable-route-tp"; - private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000); - - @EndpointInject("mock:result") - private MockEndpoint result; - - @BindToRegistry("resumeStrategy") - private TestUpdateStrategy resumeStrategy; - private CountDownLatch messagesLatch; - - private static class TestUpdateStrategy extends TransientResumeStrategy { - private final CountDownLatch messagesLatch; - private boolean startCalled; - private boolean offsetNull = true; - private boolean offsetAddressableNull = true; - private boolean offsetAddressableEmpty = true; - private boolean offsetValueNull = true; - private boolean offsetValueEmpty = true; - private int lastOffset; - - public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) { - super(resumeAdapter); - - this.messagesLatch = messagesLatch; - } - - @Override - public void start() { - LOG.warn("Start was called"); - startCalled = true; - } - - @Override - public void init() { - LOG.warn("Init was called"); - } - - @Override - public void updateLastOffset(Resumable offset) { - try { - if (offset != null) { - offsetNull = false; - - OffsetKey<?> addressable = offset.getOffsetKey(); - if (addressable != null) { - offsetAddressableNull = false; - offsetAddressableEmpty = addressable.getValue() == null; - - } - - Offset<?> offsetValue = offset.getLastOffset(); - if (offsetValue != null) { - offsetValueNull = false; - - if (offsetValue.getValue() != null) { - offsetValueEmpty = false; - lastOffset = (int) offsetValue.getValue(); - } - } - } - } finally { - messagesLatch.countDown(); - } - } - - public boolean isOffsetNull() { - return offsetNull; - } - - public boolean isOffsetAddressableNull() { - return offsetAddressableNull; - } - - public boolean isOffsetValueNull() { - return offsetValueNull; - } - - public boolean isOffsetAddressableEmpty() { - return offsetAddressableEmpty; - } - - public boolean isOffsetValueEmpty() { - return offsetValueEmpty; - } - - public boolean isStartCalled() { - return startCalled; - } - } - - private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter { - private boolean resumeCalled; - private boolean consumerIsNull = true; - - @Override - public void setConsumer(Consumer<?, ?> consumer) { - if (consumer != null) { - consumerIsNull = false; - } - } - - @Override - public void setKafkaResumable(KafkaResumable kafkaResumable) { - - } - - @Override - public void resume() { - resumeCalled = true; - } - - public boolean isResumeCalled() { - return resumeCalled; - } - - public boolean isConsumerIsNull() { - return consumerIsNull; - } - } - - @BeforeEach - public void before() { - Properties props = getDefaultProperties(); - KafkaProducer<Object, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); - - for (int i = 0; i < 10; i++) { - producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); - } - } - - @Override - protected void doPreSetup() throws Exception { - super.doPreSetup(); - - messagesLatch = new CountDownLatch(1); - resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch); - } - - @Test - // @Timeout(value = 30) - public void testOffsetIsBeingChecked() throws InterruptedException { - assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called"); - - final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class); - assertNotNull(adapter, "The adapter should not be null"); - assertTrue(adapter.isResumeCalled(), - "The resume strategy should have been called when the partition was assigned"); - assertFalse(adapter.isConsumerIsNull(), - "The consumer passed to the strategy should not be null"); - assertTrue(resumeStrategy.isStartCalled(), - "The resume strategy should have been started"); - assertFalse(resumeStrategy.isOffsetNull(), - "The offset should not be null"); - assertFalse(resumeStrategy.isOffsetAddressableNull(), - "The offset addressable should not be null"); - assertFalse(resumeStrategy.isOffsetAddressableEmpty(), - "The offset addressable should not be empty"); - assertFalse(resumeStrategy.isOffsetValueNull(), - "The offset value should not be null"); - assertFalse(resumeStrategy.isOffsetValueEmpty(), - "The offset value should not be empty"); - assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets don't match"); - } - - @AfterEach - public void after() { - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); - } - - @Override - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - @Override - public void configure() { - from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000" - + "&autoOffsetReset=earliest&consumersCount=1") - .resumable().resumeStrategy("resumeStrategy", "DEBUG") - .routeId("resume-strategy-route") - .setHeader(Exchange.OFFSET, constant(Resumables.of("key", RANDOM_VALUE))) - // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it - // .process(e -> e.setException(new RuntimeCamelException("Mock error in test"))) - .to("mock:result"); - } - }; - } -} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java index 0cfe5d86703..89ca17daf87 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java @@ -119,61 +119,7 @@ public class TransientResumeStrategy implements ResumeStrategy { @Override public ResumeCache<?> getResumeCache() { - return new ResumeCache<>() { - private Map<Object, Object> cache = new HashMap<>(); - - @Override - public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) { - return cache.computeIfAbsent(key, mapping); - } - - @Override - public Object computeIfPresent( - Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) { - return cache.computeIfPresent(key, remapping); - } - - @Override - public boolean contains(Object key, Object entry) { - return Objects.equals(cache.get(key), entry); - } - - @Override - public void add(Object key, Object offsetValue) { - cache.put(key, offsetValue); - } - - @Override - public boolean isFull() { - return false; - } - - @Override - public long capacity() { - return Integer.MAX_VALUE; - } - - @Override - public <T> T get(Object key, Class<T> clazz) { - final Object o = cache.get(key); - - return clazz.cast(o); - } - - @Override - public Object get(Object key) { - return cache.get(key); - } - - @Override - public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) { - for (Map.Entry e : cache.entrySet()) { - if (!action.apply(e.getKey(), e.getValue())) { - cache.remove(e.getKey()); - } - } - } - }; + return createSimpleCache(); } @Override @@ -184,4 +130,61 @@ public class TransientResumeStrategy implements ResumeStrategy { } }; } + + public static ResumeCache<Object> createSimpleCache() { + return new ResumeCache<>() { + private Map<Object, Object> cache = new HashMap<>(); + + @Override + public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) { + return cache.computeIfAbsent(key, mapping); + } + + @Override + public Object computeIfPresent(Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) { + return cache.computeIfPresent(key, remapping); + } + + @Override + public boolean contains(Object key, Object entry) { + return Objects.equals(cache.get(key), entry); + } + + @Override + public void add(Object key, Object offsetValue) { + cache.put(key, offsetValue); + } + + @Override + public boolean isFull() { + return false; + } + + @Override + public long capacity() { + return Integer.MAX_VALUE; + } + + @Override + public <T> T get(Object key, Class<T> clazz) { + final Object o = cache.get(key); + + return clazz.cast(o); + } + + @Override + public Object get(Object key) { + return cache.get(key); + } + + @Override + public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) { + for (Map.Entry e : cache.entrySet()) { + if (!action.apply(e.getKey(), e.getValue())) { + cache.remove(e.getKey()); + } + } + } + }; + } }