This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 6e87bb72797 CAMEL-19286: camel-ignite - Support the REPLACE operation in the Ignite Cache Producer (#9908) 6e87bb72797 is described below commit 6e87bb72797b297cef7402908fd0ba3e470d7465 Author: Kengo Seki <sek...@apache.org> AuthorDate: Fri Apr 21 17:02:32 2023 +0900 CAMEL-19286: camel-ignite - Support the REPLACE operation in the Ignite Cache Producer (#9908) --- .../camel/component/ignite/IgniteConstants.java | 4 ++-- .../ignite/cache/IgniteCacheOperation.java | 3 ++- .../ignite/cache/IgniteCacheProducer.java | 22 +++++++++++++++++++ .../camel/component/ignite/IgniteCacheTest.java | 25 ++++++++++++++++++++++ 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java index 5c7dc92df2c..2ab7b75d15f 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java @@ -55,8 +55,8 @@ public final class IgniteConstants { "It does not allow you to dynamically change the cache against which a producer operation is performed. Use EIPs for that (e.g. recipient list, dynamic router).", javaType = "String", applicableFor = SCHEME_CACHE) public static final String IGNITE_CACHE_NAME = "CamelIgniteCacheName"; - @Metadata(label = "consumer", - description = "This header carries the old cache value when passed in the incoming cache event.", + @Metadata(description = "(producer) The old cache value to be replaced when invoking the REPLACE operation. \n" + + "(consumer) This header carries the old cache value when passed in the incoming cache event.", javaType = "Object", applicableFor = SCHEME_CACHE) public static final String IGNITE_CACHE_OLD_VALUE = "CamelIgniteCacheOldValue"; diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java index 1ca66304be3..ce76db93250 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java @@ -27,6 +27,7 @@ public enum IgniteCacheOperation { SIZE, REBALANCE, QUERY, - CLEAR + CLEAR, + REPLACE, } diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java index ebcb4f5552b..afdab7542a0 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java @@ -85,6 +85,10 @@ public class IgniteCacheProducer extends DefaultAsyncProducer { doRebalance(in, out); break; + case REPLACE: + doReplace(in, out); + break; + default: break; } @@ -196,6 +200,24 @@ public class IgniteCacheProducer extends DefaultAsyncProducer { out.setBody(result); } + private void doReplace(Message in, Message out) { + Object cacheKey = in.getHeader(IgniteConstants.IGNITE_CACHE_KEY); + + if (cacheKey == null) { + throw new RuntimeCamelException( + "Cache REPLACE operation requires the cache key in the CamelIgniteCacheKey header"); + } + + Object oldValue = in.getHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE); + if (oldValue == null) { + cache.replace(cacheKey, in.getBody()); + } else { + cache.replace(cacheKey, oldValue, in.getBody()); + } + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + } + private Object cacheKey(Message msg) { Object cacheKey = msg.getHeader(IgniteConstants.IGNITE_CACHE_KEY); if (cacheKey == null) { diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java index b55984d89f3..6e077553b22 100644 --- a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteCacheTest.java @@ -179,6 +179,31 @@ public class IgniteCacheTest extends AbstractIgniteTest { } + @Test + public void testReplaceEntry() { + template.requestBodyAndHeader("ignite-cache:" + resourceUid + "?operation=REPLACE", "5678", + IgniteConstants.IGNITE_CACHE_KEY, "abcd"); + IgniteCache<String, String> cache = ignite().getOrCreateCache(resourceUid); + Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(0); + + cache.put("abcd", "1234"); + template.requestBodyAndHeader("ignite-cache:" + resourceUid + "?operation=REPLACE", "5678", + IgniteConstants.IGNITE_CACHE_KEY, "abcd"); + Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1); + Assertions.assertThat(cache.get("abcd")).isEqualTo("5678"); + + Map<String, Object> headers + = Map.of(IgniteConstants.IGNITE_CACHE_KEY, "abcd", IgniteConstants.IGNITE_CACHE_OLD_VALUE, "1234"); + template.requestBodyAndHeaders("ignite-cache:" + resourceUid + "?operation=REPLACE", "9", headers); + Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1); + Assertions.assertThat(cache.get("abcd")).isEqualTo("5678"); + + headers = Map.of(IgniteConstants.IGNITE_CACHE_KEY, "abcd", IgniteConstants.IGNITE_CACHE_OLD_VALUE, "5678"); + template.requestBodyAndHeaders("ignite-cache:" + resourceUid + "?operation=REPLACE", "9", headers); + Assertions.assertThat(cache.size(CachePeekMode.ALL)).isEqualTo(1); + Assertions.assertThat(cache.get("abcd")).isEqualTo("9"); + } + @Test public void testClearCache() { IgniteCache<String, String> cache = ignite().getOrCreateCache(resourceUid);