This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.8.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.8.x by this push: new 01bee4aa686 CAMEL-21407: Implemented the case of processing an unsubscribe request by first looking to see if the message body is a control message, and processing it, then obtaining the parameters from headers if they are not in the message body. (#16128) 01bee4aa686 is described below commit 01bee4aa686c720f5ec76fb40d12336f4d3b7c3a Author: Steve Storck <steve...@gmail.com> AuthorDate: Thu Oct 31 07:54:33 2024 +0000 CAMEL-21407: Implemented the case of processing an unsubscribe request by first looking to see if the message body is a control message, and processing it, then obtaining the parameters from headers if they are not in the message body. (#16128) --- .../control/DynamicRouterControlProducer.java | 13 +++++++--- .../control/DynamicRouterControlProducerTest.java | 28 ++++++++++++++++------ 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java index 797052f56a1..178cd090339 100644 --- a/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java +++ b/components/camel-dynamic-router/src/main/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducer.java @@ -164,9 +164,16 @@ public class DynamicRouterControlProducer extends HeaderSelectorProducer { */ @InvokeOnHeader(CONTROL_ACTION_UNSUBSCRIBE) public void performUnsubscribe(final Message message, AsyncCallback callback) { - Map<String, Object> headers = message.getHeaders(); - String subscriptionId = (String) headers.getOrDefault(CONTROL_SUBSCRIPTION_ID, configuration.getSubscriptionId()); - String subscribeChannel = (String) headers.getOrDefault(CONTROL_SUBSCRIBE_CHANNEL, configuration.getSubscribeChannel()); + String subscriptionId; + String subscribeChannel; + if (message.getBody() instanceof DynamicRouterControlMessage) { + DynamicRouterControlMessage controlMessage = message.getBody(DynamicRouterControlMessage.class); + subscriptionId = controlMessage.getSubscriptionId(); + subscribeChannel = controlMessage.getSubscribeChannel(); + } else { + subscriptionId = message.getHeader(CONTROL_SUBSCRIPTION_ID, configuration.getSubscriptionId(), String.class); + subscribeChannel = message.getHeader(CONTROL_SUBSCRIBE_CHANNEL, configuration.getSubscribeChannel(), String.class); + } boolean result = dynamicRouterControlService.removeSubscription(subscribeChannel, subscriptionId); message.setBody(result, boolean.class); callback.done(false); diff --git a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java index ea65164dfe5..f86a48d5557 100644 --- a/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java +++ b/components/camel-dynamic-router/src/test/java/org/apache/camel/component/dynamicrouter/control/DynamicRouterControlProducerTest.java @@ -40,7 +40,6 @@ import static org.apache.camel.component.dynamicrouter.control.DynamicRouterCont import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_LIST; import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_STATS; import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_SUBSCRIBE; -import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UNSUBSCRIBE; import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_ACTION_UPDATE; import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_DESTINATION_URI; import static org.apache.camel.component.dynamicrouter.control.DynamicRouterControlConstants.CONTROL_EXPRESSION_LANGUAGE; @@ -125,14 +124,29 @@ class DynamicRouterControlProducerTest { } @Test - void performUnsubscribeAction() { + void performUnsubscribeActionWithControlMessage() { String subscriptionId = "testId"; String subscribeChannel = "testChannel"; - Map<String, Object> headers = Map.of( - CONTROL_ACTION_HEADER, CONTROL_ACTION_UNSUBSCRIBE, - CONTROL_SUBSCRIBE_CHANNEL, subscribeChannel, - CONTROL_SUBSCRIPTION_ID, subscriptionId); - when(message.getHeaders()).thenReturn(headers); + DynamicRouterControlMessage unsubscribeMsg = DynamicRouterControlMessage.Builder.newBuilder() + .subscriptionId(subscriptionId) + .subscribeChannel(subscribeChannel) + .build(); + when(message.getBody()).thenReturn(unsubscribeMsg); + when(message.getBody(DynamicRouterControlMessage.class)).thenReturn(unsubscribeMsg); + Mockito.doNothing().when(callback).done(false); + producer.performUnsubscribe(message, callback); + Mockito.verify(controlService, Mockito.times(1)) + .removeSubscription(subscribeChannel, subscriptionId); + } + + @Test + void performUnsubscribeActionWithHeaders() { + String subscriptionId = "testId"; + String subscribeChannel = "testChannel"; + when(message.getHeader(CONTROL_SUBSCRIPTION_ID, configuration.getSubscriptionId(), String.class)) + .thenReturn(subscriptionId); + when(message.getHeader(CONTROL_SUBSCRIBE_CHANNEL, configuration.getSubscribeChannel(), String.class)) + .thenReturn(subscribeChannel); Mockito.doNothing().when(callback).done(false); producer.performUnsubscribe(message, callback); Mockito.verify(controlService, Mockito.times(1))