This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 32f5e5f3 PROTON-2902 Adds a simple addFilter API to the SourceOptions 
type
32f5e5f3 is described below

commit 32f5e5f3c3d1a353dec6a288d09cbe3e852c9e5d
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Aug 7 14:06:56 2025 -0400

    PROTON-2902 Adds a simple addFilter API to the SourceOptions type
    
    Allows for setting filters in a more compact amount of user code.
---
 .../apache/qpid/protonj2/client/SourceOptions.java |  78 ++++++++++++
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 139 +++++++++++++++++++++
 2 files changed, 217 insertions(+)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
index 0ade3a1c..dd35f943 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
@@ -19,8 +19,10 @@ package org.apache.qpid.protonj2.client;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.qpid.protonj2.client.impl.ClientDeliveryState;
+import org.apache.qpid.protonj2.types.DescribedType;
 
 /**
  * Options type that carries configuration for link Source types.
@@ -99,6 +101,32 @@ public final class SourceOptions extends 
TerminusOptions<SourceOptions> implemen
         return self();
     }
 
+    /**
+     * Adds the given named filter into the map of filters (one will be 
created if not already set).
+     * <p>
+     * If a previous filters {@link Map} was assigned this new filter instance 
will be assigned
+     * into that existing map, it is not cleared or reallocated. The 
descriptor should either be
+     * an Symbol or UnsignedLong that aligns with the filters definition being 
used.
+     *
+     * @param name
+     *                 The name to use when adding the described filter to the 
filters {@link Map}.
+     * @param descriptor
+     *                 The descriptor used for the {@link DescribedType} that 
will carry the filter.
+     * @param filter
+     *                 The filter value to assign to the filter {@link 
DescribedType}.
+     *
+     * @return this {@link SourceOptions} instance.
+     */
+    public SourceOptions addFilter(String name, Object descriptor, Object 
filter) {
+        if (filters == null) {
+            filters = new HashMap<>();
+        }
+
+        filters.put(name, new FilterDescribedType(descriptor, filter));
+
+        return self();
+    }
+
     /**
      * @return the configured default outcome as a {@link DeliveryState} 
instance.
      */
@@ -139,4 +167,54 @@ public final class SourceOptions extends 
TerminusOptions<SourceOptions> implemen
     SourceOptions self() {
         return this;
     }
+
+    private static class FilterDescribedType implements DescribedType {
+
+        private final Object descriptor;
+        private final Object described;
+
+        public FilterDescribedType(Object descriptor, Object described) {
+            this.descriptor = descriptor;
+            this.described = described;
+        }
+
+        @Override
+        public Object getDescriptor() {
+            return descriptor;
+        }
+
+        @Override
+        public Object getDescribed() {
+            return this.described;
+        }
+
+        @Override
+        public String toString() {
+            return "FilterDescribedType{ descriptor:" + descriptor + ", 
described:" + described + " }";
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(described, descriptor);
+        }
+
+        @Override
+        public boolean equals(Object target) {
+            if (this == target) {
+                return true;
+            }
+
+            if (target == null) {
+                return false;
+            }
+
+            if (!(target instanceof DescribedType)) {
+                return false;
+            }
+
+            final DescribedType other = (DescribedType) target;
+
+            return Objects.equals(descriptor, other.getDescriptor()) && 
Objects.equals(described, other.getDescribed());
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index e630f484..d64ef8df 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2736,6 +2736,74 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void 
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorUsingSourceOptionsAddFilter()
 throws Exception {
+        final PeerJmsSelectorType peerJmsSelector = new 
PeerJmsSelectorType("myProperty=42");
+        final Map<String, Object> filtersAtPeer = new HashMap<>();
+        filtersAtPeer.put("jms-selector", peerJmsSelector);
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver()
+                               .withSource().withAddress("test-queue")
+                                            .withDistributionMode("copy")
+                                            .withTimeout(128)
+                                            
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+                                            .withDefaultOutcome(new Released())
+                                            .withCapabilities("QUEUE")
+                                            .withFilter(filtersAtPeer)
+                                            
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+                                            .also()
+                               .withTarget().withAddress(notNullValue())
+                                            .withCapabilities("QUEUE")
+                                            
.withDurable(TerminusDurability.CONFIGURATION)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+                                            .withTimeout(42)
+                                            .withDynamic(anyOf(nullValue(), 
equalTo(false)))
+                                            
.withDynamicNodeProperties(nullValue())
+                               .and().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession();
+            ReceiverOptions receiverOptions = new ReceiverOptions();
+
+            receiverOptions.sourceOptions().capabilities("QUEUE");
+            
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+            receiverOptions.sourceOptions().timeout(128);
+            
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+            
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+            
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+            receiverOptions.sourceOptions().addFilter("jms-selector", 
UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
+            
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, 
DeliveryState.Type.REJECTED);
+
+            receiverOptions.targetOptions().capabilities("QUEUE");
+            
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+            
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+            receiverOptions.targetOptions().timeout(42);
+
+            Receiver receiver = session.openReceiver("test-queue", 
receiverOptions).openFuture().get();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void 
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilter()
 throws Exception {
         final DescribedType clientJmsSelector = new 
AmqpJmsSelectorType("myProperty=42");
@@ -2813,6 +2881,77 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void 
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilterUsingAddFilter()
 throws Exception {
+        final PeerJmsSelectorType peerJmsSelector = new 
PeerJmsSelectorType("myProperty=42");
+        final PeerNoLocalFilterType peerNoLocalFilter = new 
PeerNoLocalFilterType();
+        final Map<String, Object> filtersAtPeer = new HashMap<>();
+        filtersAtPeer.put("jms-selector", peerJmsSelector);
+        filtersAtPeer.put("no-local", peerNoLocalFilter);
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver()
+                               .withSource().withAddress("test-queue")
+                                            .withDistributionMode("copy")
+                                            .withTimeout(128)
+                                            
.withDurable(TerminusDurability.UNSETTLED_STATE)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+                                            .withDefaultOutcome(new Released())
+                                            .withCapabilities("QUEUE")
+                                            .withFilter(filtersAtPeer)
+                                            
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+                                            .also()
+                               .withTarget().withAddress(notNullValue())
+                                            .withCapabilities("QUEUE")
+                                            
.withDurable(TerminusDurability.CONFIGURATION)
+                                            
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+                                            .withTimeout(42)
+                                            .withDynamic(anyOf(nullValue(), 
equalTo(false)))
+                                            
.withDynamicNodeProperties(nullValue())
+                               .and().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession();
+            ReceiverOptions receiverOptions = new ReceiverOptions();
+
+            receiverOptions.sourceOptions().capabilities("QUEUE");
+            
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+            receiverOptions.sourceOptions().timeout(128);
+            
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+            
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+            
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+            receiverOptions.sourceOptions().addFilter("jms-selector", 
UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
+            receiverOptions.sourceOptions().addFilter("no-local", 
UnsignedLong.valueOf(0x0000468C00000003L), "NoLocalFilter{}");
+            
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED, 
DeliveryState.Type.REJECTED);
+
+            receiverOptions.targetOptions().capabilities("QUEUE");
+            
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+            
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+            receiverOptions.targetOptions().timeout(42);
+
+            Receiver receiver = session.openReceiver("test-queue", 
receiverOptions).openFuture().get();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void testOpenDurableReceiver() throws Exception {
         final String address = "test-topic";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to