This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ca8dae067d3 CAMEL-21407: Implemented the case of processing an 
unsubscribe request by first looking to see if the message body is a control 
message, and processing it, then obtaining the parameters from headers if they 
are not in the message body. (#16128)
ca8dae067d3 is described below

commit ca8dae067d3a769061d2ada4494415694eacb9ad
Author: Steve Storck <steve...@gmail.com>
AuthorDate: Thu Oct 31 07:54:33 2024 +0000

    CAMEL-21407: Implemented the case of processing an unsubscribe request by 
first looking to see if the message body is a control message, and processing 
it, then obtaining the parameters from headers if they are not in the message 
body. (#16128)
---
 .../control/DynamicRouterControlProducer.java      | 13 +++++++---
 .../control/DynamicRouterControlProducerTest.java  | 28 ++++++++++++++++------
 2 files changed, 31 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
 
b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
index 797052f56a1..178cd090339 100644
--- 
a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
+++ 
b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java
@@ -164,9 +164,16 @@ public class DynamicRouterControlProducer extends 
HeaderSelectorProducer {
      */
     @InvokeOnHeader(CONTROL_ACTION_UNSUBSCRIBE)
     public void performUnsubscribe(final Message message, AsyncCallback 
callback) {
-        Map<String, Object> headers = message.getHeaders();
-        String subscriptionId = (String) 
headers.getOrDefault(CONTROL_SUBSCRIPTION_ID, 
configuration.getSubscriptionId());
-        String subscribeChannel = (String) 
headers.getOrDefault(CONTROL_SUBSCRIBE_CHANNEL, 
configuration.getSubscribeChannel());
+        String subscriptionId;
+        String subscribeChannel;
+        if (message.getBody() instanceof DynamicRouterControlMessage) {
+            DynamicRouterControlMessage controlMessage = 
message.getBody(DynamicRouterControlMessage.class);
+            subscriptionId = controlMessage.getSubscriptionId();
+            subscribeChannel = controlMessage.getSubscribeChannel();
+        } else {
+            subscriptionId = message.getHeader(CONTROL_SUBSCRIPTION_ID, 
configuration.getSubscriptionId(), String.class);
+            subscribeChannel = message.getHeader(CONTROL_SUBSCRIBE_CHANNEL, 
configuration.getSubscribeChannel(), String.class);
+        }
         boolean result = 
dynamicRouterControlService.removeSubscription(subscribeChannel, 
subscriptionId);
         message.setBody(result, boolean.class);
         callback.done(false);
diff --git 
a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
 
b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
index ea65164dfe5..f86a48d5557 100644
--- 
a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
+++ 
b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java
@@ -40,7 +40,6 @@ import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterCont
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_LIST;
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_STATS;
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_SUBSCRIBE;
-import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UNSUBSCRIBE;
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UPDATE;
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_DESTINATION_URI;
 import static 
org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_EXPRESSION_LANGUAGE;
@@ -125,14 +124,29 @@ class DynamicRouterControlProducerTest {
     }
 
     @Test
-    void performUnsubscribeAction() {
+    void performUnsubscribeActionWithControlMessage() {
         String subscriptionId = "testId";
         String subscribeChannel = "testChannel";
-        Map<String, Object> headers = Map.of(
-                CONTROL_ACTION_HEADER, CONTROL_ACTION_UNSUBSCRIBE,
-                CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel,
-                CONTROL_SUBSCRIPTION_ID, subscriptionId);
-        when(message.getHeaders()).thenReturn(headers);
+        DynamicRouterControlMessage unsubscribeMsg = 
DynamicRouterControlMessage.Builder.newBuilder()
+                .subscriptionId(subscriptionId)
+                .subscribeChannel(subscribeChannel)
+                .build();
+        when(message.getBody()).thenReturn(unsubscribeMsg);
+        
when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(unsubscribeMsg);
+        Mockito.doNothing().when(callback).done(false);
+        producer.performUnsubscribe(message, callback);
+        Mockito.verify(controlService, Mockito.times(1))
+                .removeSubscription(subscribeChannel, subscriptionId);
+    }
+
+    @Test
+    void performUnsubscribeActionWithHeaders() {
+        String subscriptionId = "testId";
+        String subscribeChannel = "testChannel";
+        when(message.getHeader(CONTROL_SUBSCRIPTION_ID, 
configuration.getSubscriptionId(), String.class))
+                .thenReturn(subscriptionId);
+        when(message.getHeader(CONTROL_SUBSCRIBE_CHANNEL, 
configuration.getSubscribeChannel(), String.class))
+                .thenReturn(subscribeChannel);
         Mockito.doNothing().when(callback).done(false);
         producer.performUnsubscribe(message, callback);
         Mockito.verify(controlService, Mockito.times(1))

Reply via email to