This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d581cae407 CAMEL-18127: added adapter auto-configuration for Cassandra
9d581cae407 is described below

commit 9d581cae407794f55cace2c7bb3a8734db036172
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Mon Jun 6 13:16:20 2022 +0200

    CAMEL-18127: added adapter auto-configuration for Cassandra
---
 .../org/apache/camel/catalog/components/cql.json   |  3 +-
 .../component/caffeine/resume/CaffeineCache.java   | 13 ++++
 .../org/apache/camel/component/cassandra/cql.json  |  3 +-
 .../component/cassandra/CassandraConstants.java    |  3 +
 .../component/cassandra/CassandraConsumer.java     | 15 ++++-
 ...sumeAdapter.java => CassandraResumeAction.java} | 16 ++---
 .../consumer/support/CassandraResumeAdapter.java   |  9 ++-
 .../support/DefaultCassandraResumeAdapter.java     | 70 ++++++++++++++++++++++
 .../org/apache/camel/resume/adapter.properties     | 19 ++++++
 .../CassandraComponentResumeStrategyIT.java        | 30 +++-------
 .../org/apache/camel/resume/cache/ResumeCache.java |  7 +++
 11 files changed, 149 insertions(+), 39 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
index 0938f6873be..2a568f0f551 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/cql.json
@@ -27,7 +27,8 @@
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
   },
   "headers": {
-    "CamelCqlQuery": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute.", "constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }
+    "CamelCqlQuery": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute.", "constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
+    "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute when resuming.", 
"constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION"
 }
   },
   "properties": {
     "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", 
"label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "beanRef is defined using bean:id" },
diff --git 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
 
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
index 5ec66ddc88b..30d1de9fd54 100644
--- 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
+++ 
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
@@ -18,6 +18,7 @@
 package org.apache.camel.component.caffeine.resume;
 
 import java.util.Optional;
+import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -121,4 +122,16 @@ public class CaffeineCache<K> implements ResumeCache<K> {
     public long capacity() {
         return cacheSize;
     }
+
+    @Override
+    public void forEach(BiFunction<? super K, ? super Object, Boolean> action) 
{
+
+        final ConcurrentMap<K, Object> kObjectConcurrentMap = cache.asMap();
+        for (var entry : kObjectConcurrentMap.entrySet()) {
+            final boolean invalidate = action.apply(entry.getKey(), 
entry.getValue());
+            if (invalidate) {
+                cache.invalidate(entry.getKey());
+            }
+        }
+    }
 }
diff --git 
a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
 
b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
index 0938f6873be..2a568f0f551 100644
--- 
a/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
+++ 
b/components/camel-cassandraql/src/generated/resources/org/apache/camel/component/cassandra/cql.json
@@ -27,7 +27,8 @@
     "autowiredEnabled": { "kind": "property", "displayName": "Autowired 
Enabled", "group": "advanced", "label": "advanced", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": true, "description": "Whether autowiring is 
enabled. This is used for automatic autowiring options (the option must be 
marked as autowired) by looking up in the registry to find if there is a single 
instance of matching type, which t [...]
   },
   "headers": {
-    "CamelCqlQuery": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute.", "constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" }
+    "CamelCqlQuery": { "kind": "header", "displayName": "", "group": 
"producer", "label": "producer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute.", "constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CQL_QUERY" },
+    "CamelCqlResumeQuery": { "kind": "header", "displayName": "", "group": 
"consumer", "label": "consumer", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The CQL query to execute when resuming.", 
"constantName": 
"org.apache.camel.component.cassandra.CassandraConstants#CASSANDRA_RESUME_ACTION"
 }
   },
   "properties": {
     "beanRef": { "kind": "path", "displayName": "Bean Ref", "group": "common", 
"label": "", "required": false, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "autowired": false, "secret": false, 
"description": "beanRef is defined using bean:id" },
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
index 6dc155250c2..83323ddcf80 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConstants.java
@@ -29,6 +29,9 @@ public final class CassandraConstants {
     @Metadata(label = "producer", description = "The CQL query to execute.", 
javaType = "String")
     public static final String CQL_QUERY = "CamelCqlQuery";
 
+    @Metadata(label = "consumer", description = "The CQL query to execute when 
resuming.", javaType = "String")
+    public static final String CASSANDRA_RESUME_ACTION = "CamelCqlResumeQuery";
+
     private CassandraConstants() {
     }
 
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index f3bb3bf971e..9d396d4bd6b 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -22,10 +22,14 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import 
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction;
 import 
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.support.ScheduledPollConsumer;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
 
 /**
  * Cassandra 2 CQL3 consumer.
@@ -79,20 +83,25 @@ public class CassandraConsumer extends 
ScheduledPollConsumer implements ResumeAw
 
     @Override
     protected void doStart() throws Exception {
-        super.doStart();
         if (isPrepareStatements()) {
             preparedStatement = getEndpoint().prepareStatement();
         }
 
         if (resumeStrategy != null) {
-            CqlSession session = getEndpoint().getSessionHolder().getSession();
+            resumeStrategy.loadCache();
 
             CassandraResumeAdapter resumeAdapter = 
resumeStrategy.getAdapter(CassandraResumeAdapter.class);
             if (resumeAdapter != null) {
-                resumeAdapter.setSession(session);
+                CassandraResumeAction action = (CassandraResumeAction) 
getEndpoint().getCamelContext().getRegistry()
+                        .lookupByName(CASSANDRA_RESUME_ACTION);
+                ObjectHelper.notNull(action, "The resume action cannot be 
null", this);
+
+                resumeAdapter.setResumeAction(action);
                 resumeAdapter.resume();
             }
         }
+
+        super.doStart();
     }
 
     @Override
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
similarity index 70%
copy from 
components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
copy to 
components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
index e8ccbed8efe..73c691ee5a5 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAction.java
@@ -17,18 +17,18 @@
 
 package org.apache.camel.component.cassandra.consumer.support;
 
-import com.datastax.oss.driver.api.core.CqlSession;
-import org.apache.camel.resume.ResumeAdapter;
-
 /**
- * Provides a resume adapter for Cassandra consumers
+ * Provides and interface for integrations to run actions during resume
  */
-public interface CassandraResumeAdapter extends ResumeAdapter {
+public interface CassandraResumeAction {
 
     /**
-     * Sets the session that allow implementations to run a one-time query on 
the DB
+     * Runs an action on an resumable (entry)
      * 
-     * @param session
+     * @param  key   the resumable key
+     * @param  value the resumable value
+     *
+     * @return       true if the entry addressed should be invalidated or 
false otherwise
      */
-    void setSession(CqlSession session);
+    boolean evalEntry(Object key, Object value);
 }
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
index e8ccbed8efe..7de5f5958c9 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/CassandraResumeAdapter.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.component.cassandra.consumer.support;
 
-import com.datastax.oss.driver.api.core.CqlSession;
 import org.apache.camel.resume.ResumeAdapter;
 
 /**
@@ -26,9 +25,9 @@ import org.apache.camel.resume.ResumeAdapter;
 public interface CassandraResumeAdapter extends ResumeAdapter {
 
     /**
-     * Sets the session that allow implementations to run a one-time query on 
the DB
-     * 
-     * @param session
+     * Sets an action that will be executed during resume
+     *
+     * @param resumeAction the action to execute during resume
      */
-    void setSession(CqlSession session);
+    void setResumeAction(CassandraResumeAction resumeAction);
 }
diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
new file mode 100644
index 00000000000..62a3dab489f
--- /dev/null
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/consumer/support/DefaultCassandraResumeAdapter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.cassandra.consumer.support;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class DefaultCassandraResumeAdapter implements CassandraResumeAdapter, 
Cacheable, Deserializable {
+    private ResumeCache<Object> cache;
+    private CassandraResumeAction resumeAction;
+
+    @Override
+    public void setResumeAction(CassandraResumeAction resumeAction) {
+        this.resumeAction = resumeAction;
+    }
+
+    @Override
+    public void resume() {
+        cache.forEach(resumeAction::evalEntry);
+    }
+
+    private boolean add(Object key, Object offset) {
+        cache.add(key, offset);
+
+        return true;
+    }
+
+    @Override
+    public boolean add(OffsetKey<?> key, Offset<?> offset) {
+        return add(key.getValue(), offset.getValue());
+    }
+
+    @Override
+    public void setCache(ResumeCache<?> cache) {
+        this.cache = (ResumeCache<Object>) cache;
+    }
+
+    @Override
+    public ResumeCache<?> getCache() {
+        return cache;
+    }
+
+    @Override
+    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+        Object key = deserializeObject(keyBuffer);
+        Object value = deserializeObject(valueBuffer);
+
+        return add(key, value);
+    }
+}
diff --git 
a/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
 
b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..59898d3e1ac
--- /dev/null
+++ 
b/components/camel-cassandraql/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+
+adapterClass=org.apache.camel.component.cassandra.consumer.support.DefaultCassandraResumeAdapter
diff --git 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
index b41a2a2ef33..ee8647202c0 100644
--- 
a/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
+++ 
b/components/camel-cassandraql/src/test/java/org/apache/camel/component/cassandra/integration/CassandraComponentResumeStrategyIT.java
@@ -20,46 +20,33 @@ package org.apache.camel.component.cassandra.integration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.datastax.oss.driver.api.core.CqlSession;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import 
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAction;
 import 
org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.camel.component.cassandra.CassandraConstants.CASSANDRA_RESUME_ACTION;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class CassandraComponentResumeStrategyIT extends BaseCassandra {
 
     private static class TestCassandraResumeAdapter implements 
CassandraResumeAdapter {
-        private boolean sessionCalled;
-        private boolean sessionNotNull;
         private boolean resumeCalled;
+        private boolean resumeActionNotNull;
 
         @Override
-        public void setSession(CqlSession session) {
-            sessionCalled = true;
-            sessionNotNull = session != null;
+        public void setResumeAction(CassandraResumeAction action) {
+            resumeActionNotNull = action != null;
         }
 
         @Override
         public void resume() {
             resumeCalled = true;
         }
-
-        public boolean isSessionCalled() {
-            return sessionCalled;
-        }
-
-        public boolean isSessionNotNull() {
-            return sessionNotNull;
-        }
-
-        public boolean isResumeCalled() {
-            return resumeCalled;
-        }
     }
 
     private static final String CQL = "select login, first_name, last_name 
from camel_user";
@@ -79,15 +66,16 @@ public class CassandraComponentResumeStrategyIT extends 
BaseCassandra {
         mock.await(1, TimeUnit.SECONDS);
         assertMockEndpointsSatisfied();
 
-        assertTrue(resumeStrategy.isSessionCalled());
-        assertTrue(resumeStrategy.isSessionNotNull());
-        assertTrue(resumeStrategy.isResumeCalled());
+        assertTrue(resumeStrategy.resumeActionNotNull);
+        assertTrue(resumeStrategy.resumeCalled);
     }
 
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
+                bindToRegistry(CASSANDRA_RESUME_ACTION, 
(CassandraResumeAction) (key, value) -> true);
+
                 fromF("cql://%s/%s?cql=%s", getUrl(), KEYSPACE_NAME, CQL)
                         .resumable(new TransientResumeStrategy(resumeStrategy))
                         .to("mock:resultAll");
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index 090492b32b4..9791287579e 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.resume.cache;
 
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -94,4 +95,10 @@ public interface ResumeCache<K> {
      * @return     the offset value
      */
     Object get(K key);
+
+    /**
+     * Performs the given action for each member of the cache
+     * @param action the action to execute
+     */
+    void forEach(BiFunction<? super K, ? super Object, Boolean> action);
 }

Reply via email to