This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 9d581cae407 CAMEL-18127: added adapter auto-configuration for Cassandra 9d581cae407 is described below commit 9d581cae407794f55cace2c7bb3a8734db036172 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Mon Jun 6 13:16:20 2022 +0200 CAMEL-18127: added adapter auto-configuration for Cassandra --- .../org/apache/camel/catalog/components/cql.json | 3 +- .../component/caffeine/resume/CaffeineCache.java | 13 ++++ .../org/apache/camel/component/cassandra/cql.json | 3 +- .../component/cassandra/CassandraConstants.java | 3 + .../component/cassandra/CassandraConsumer.java | 15 ++++- ...sumeAdapter.java => CassandraResumeAction.java} | 16 ++--- .../consumer/support/CassandraResumeAdapter.java | 9 ++- .../support/DefaultCassandraResumeAdapter.java | 70 ++++++++++++++++++++++ .../org/apache/camel/resume/adapter.properties | 19 ++++++ .../CassandraComponentResumeStrategyIT.java | 30 +++------- .../org/apache/camel/resume/cache/ResumeCache.java | 7 +++ 11 files changed, 149 insertions(+), 39 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json index 0938f6873be..2a568f0f551 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json @@ -27,7 +27,8 @@ "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] }, "headers": { - "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" } + "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }, + "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" } }, "properties": { "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "beanRef is defined using bean:id" }, diff --git a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java index 5ec66ddc88b..30d1de9fd54 100644 --- a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java +++ b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java @@ -18,6 +18,7 @@ package org.apache.camel.component.caffeine.resume; import java.util.Optional; +import java.util.concurrent.ConcurrentMap; import java.util.function.BiFunction; import java.util.function.Function; @@ -121,4 +122,16 @@ public class CaffeineCache<K> implements ResumeCache<K> { public long capacity() { return cacheSize; } + + @Override + public void forEach(BiFunction<? super K, ? super Object, Boolean> action) { + + final ConcurrentMap<K, Object> kObjectConcurrentMap = cache.asMap(); + for (var entry : kObjectConcurrentMap.entrySet()) { + final boolean invalidate = action.apply(entry.getKey(), entry.getValue()); + if (invalidate) { + cache.invalidate(entry.getKey()); + } + } + } } diff --git a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json index 0938f6873be..2a568f0f551 100644 --- a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json +++ b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json @@ -27,7 +27,8 @@ "autowiredEnabled": { "kind": "property", "displayName": "Autowired Enabled", "group": "advanced", "label": "advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired) by looking up in the registry to find if there is a single instance of matching type, which t [...] }, "headers": { - "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" } + "CamelCqlQuery": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }, + "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The CQL query to execute when resuming.", "constantName": "org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION" } }, "properties": { "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", "label": "", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "beanRef is defined using bean:id" }, diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java index 6dc155250c2..83323ddcf80 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java @@ -29,6 +29,9 @@ public final class CassandraConstants { @Metadata(label = "producer", description = "The CQL query to execute.", javaType = "String") public static final String CQL_QUERY = "CamelCqlQuery"; + @Metadata(label = "consumer", description = "The CQL query to execute when resuming.", javaType = "String") + public static final String CASSANDRA_RESUME_ACTION = "CamelCqlResumeQuery"; + private CassandraConstants() { } diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index f3bb3bf971e..9d396d4bd6b 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -22,10 +22,14 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction; import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter; import org.apache.camel.resume.ResumeAware; import org.apache.camel.resume.ResumeStrategy; import org.apache.camel.support.ScheduledPollConsumer; +import org.apache.camel.util.ObjectHelper; + +import static org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION; /** * Cassandra 2 CQL3 consumer. @@ -79,20 +83,25 @@ public class CassandraConsumer extends ScheduledPollConsumer implements ResumeAw @Override protected void doStart() throws Exception { - super.doStart(); if (isPrepareStatements()) { preparedStatement = getEndpoint().prepareStatement(); } if (resumeStrategy != null) { - CqlSession session = getEndpoint().getSessionHolder().getSession(); + resumeStrategy.loadCache(); CassandraResumeAdapter resumeAdapter = resumeStrategy.getAdapter(CassandraResumeAdapter.class); if (resumeAdapter != null) { - resumeAdapter.setSession(session); + CassandraResumeAction action = (CassandraResumeAction) getEndpoint().getCamelContext().getRegistry() + .lookupByName(CASSANDRA_RESUME_ACTION); + ObjectHelper.notNull(action, "The resume action cannot be null", this); + + resumeAdapter.setResumeAction(action); resumeAdapter.resume(); } } + + super.doStart(); } @Override diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java similarity index 70% copy from components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java copy to components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java index e8ccbed8efe..73c691ee5a5 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java @@ -17,18 +17,18 @@ package org.apache.camel.component.cassandra.consumer.support; -import com.datastax.oss.driver.api.core.CqlSession; -import org.apache.camel.resume.ResumeAdapter; - /** - * Provides a resume adapter for Cassandra consumers + * Provides and interface for integrations to run actions during resume */ -public interface CassandraResumeAdapter extends ResumeAdapter { +public interface CassandraResumeAction { /** - * Sets the session that allow implementations to run a one-time query on the DB + * Runs an action on an resumable (entry) * - * @param session + * @param key the resumable key + * @param value the resumable value + * + * @return true if the entry addressed should be invalidated or false otherwise */ - void setSession(CqlSession session); + boolean evalEntry(Object key, Object value); } diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java index e8ccbed8efe..7de5f5958c9 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java @@ -17,7 +17,6 @@ package org.apache.camel.component.cassandra.consumer.support; -import com.datastax.oss.driver.api.core.CqlSession; import org.apache.camel.resume.ResumeAdapter; /** @@ -26,9 +25,9 @@ import org.apache.camel.resume.ResumeAdapter; public interface CassandraResumeAdapter extends ResumeAdapter { /** - * Sets the session that allow implementations to run a one-time query on the DB - * - * @param session + * Sets an action that will be executed during resume + * + * @param resumeAction the action to execute during resume */ - void setSession(CqlSession session); + void setResumeAction(CassandraResumeAction resumeAction); } diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java new file mode 100644 index 00000000000..62a3dab489f --- /dev/null +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.cassandra.consumer.support; + +import java.nio.ByteBuffer; + +import org.apache.camel.resume.Cacheable; +import org.apache.camel.resume.Deserializable; +import org.apache.camel.resume.Offset; +import org.apache.camel.resume.OffsetKey; +import org.apache.camel.resume.cache.ResumeCache; + +public class DefaultCassandraResumeAdapter implements CassandraResumeAdapter, Cacheable, Deserializable { + private ResumeCache<Object> cache; + private CassandraResumeAction resumeAction; + + @Override + public void setResumeAction(CassandraResumeAction resumeAction) { + this.resumeAction = resumeAction; + } + + @Override + public void resume() { + cache.forEach(resumeAction::evalEntry); + } + + private boolean add(Object key, Object offset) { + cache.add(key, offset); + + return true; + } + + @Override + public boolean add(OffsetKey<?> key, Offset<?> offset) { + return add(key.getValue(), offset.getValue()); + } + + @Override + public void setCache(ResumeCache<?> cache) { + this.cache = (ResumeCache<Object>) cache; + } + + @Override + public ResumeCache<?> getCache() { + return cache; + } + + @Override + public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) { + Object key = deserializeObject(keyBuffer); + Object value = deserializeObject(valueBuffer); + + return add(key, value); + } +} diff --git a/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties new file mode 100644 index 00000000000..59898d3e1ac --- /dev/null +++ b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties @@ -0,0 +1,19 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + + +adapterClass=org.apache.camel.component.cassandra.consumer.support.DefaultCassandraResumeAdapter diff --git a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java index b41a2a2ef33..ee8647202c0 100644 --- a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java +++ b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java @@ -20,46 +20,33 @@ package org.apache.camel.component.cassandra.integration; import java.util.List; import java.util.concurrent.TimeUnit; -import com.datastax.oss.driver.api.core.CqlSession; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction; import org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.processor.resume.TransientResumeStrategy; import org.junit.jupiter.api.Test; +import static org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION; import static org.junit.jupiter.api.Assertions.assertTrue; public class CassandraComponentResumeStrategyIT extends BaseCassandra { private static class TestCassandraResumeAdapter implements CassandraResumeAdapter { - private boolean sessionCalled; - private boolean sessionNotNull; private boolean resumeCalled; + private boolean resumeActionNotNull; @Override - public void setSession(CqlSession session) { - sessionCalled = true; - sessionNotNull = session != null; + public void setResumeAction(CassandraResumeAction action) { + resumeActionNotNull = action != null; } @Override public void resume() { resumeCalled = true; } - - public boolean isSessionCalled() { - return sessionCalled; - } - - public boolean isSessionNotNull() { - return sessionNotNull; - } - - public boolean isResumeCalled() { - return resumeCalled; - } } private static final String CQL = "select login, first_name, last_name from camel_user"; @@ -79,15 +66,16 @@ public class CassandraComponentResumeStrategyIT extends BaseCassandra { mock.await(1, TimeUnit.SECONDS); assertMockEndpointsSatisfied(); - assertTrue(resumeStrategy.isSessionCalled()); - assertTrue(resumeStrategy.isSessionNotNull()); - assertTrue(resumeStrategy.isResumeCalled()); + assertTrue(resumeStrategy.resumeActionNotNull); + assertTrue(resumeStrategy.resumeCalled); } @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { + bindToRegistry(CASSANDRA_RESUME_ACTION, (CassandraResumeAction) (key, value) -> true); + fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL) .resumable(new TransientResumeStrategy(resumeStrategy)) .to("mock:resultAll"); diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java index 090492b32b4..9791287579e 100644 --- a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java +++ b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java @@ -17,6 +17,7 @@ package org.apache.camel.resume.cache; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -94,4 +95,10 @@ public interface ResumeCache<K> { * @return the offset value */ Object get(K key); + + /** + * Performs the given action for each member of the cache + * @param action the action to execute + */ + void forEach(BiFunction<? super K, ? super Object, Boolean> action); }