This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 32f5e5f3 PROTON-2902 Adds a simple addFilter API to the SourceOptions
type
32f5e5f3 is described below
commit 32f5e5f3c3d1a353dec6a288d09cbe3e852c9e5d
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Aug 7 14:06:56 2025 -0400
PROTON-2902 Adds a simple addFilter API to the SourceOptions type
Allows for setting filters in a more compact amount of user code.
---
.../apache/qpid/protonj2/client/SourceOptions.java | 78 ++++++++++++
.../qpid/protonj2/client/impl/ReceiverTest.java | 139 +++++++++++++++++++++
2 files changed, 217 insertions(+)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
index 0ade3a1c..dd35f943 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SourceOptions.java
@@ -19,8 +19,10 @@ package org.apache.qpid.protonj2.client;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.qpid.protonj2.client.impl.ClientDeliveryState;
+import org.apache.qpid.protonj2.types.DescribedType;
/**
* Options type that carries configuration for link Source types.
@@ -99,6 +101,32 @@ public final class SourceOptions extends
TerminusOptions<SourceOptions> implemen
return self();
}
+ /**
+ * Adds the given named filter into the map of filters (one will be
created if not already set).
+ * <p>
+ * If a previous filters {@link Map} was assigned this new filter instance
will be assigned
+ * into that existing map, it is not cleared or reallocated. The
descriptor should either be
+ * an Symbol or UnsignedLong that aligns with the filters definition being
used.
+ *
+ * @param name
+ * The name to use when adding the described filter to the
filters {@link Map}.
+ * @param descriptor
+ * The descriptor used for the {@link DescribedType} that
will carry the filter.
+ * @param filter
+ * The filter value to assign to the filter {@link
DescribedType}.
+ *
+ * @return this {@link SourceOptions} instance.
+ */
+ public SourceOptions addFilter(String name, Object descriptor, Object
filter) {
+ if (filters == null) {
+ filters = new HashMap<>();
+ }
+
+ filters.put(name, new FilterDescribedType(descriptor, filter));
+
+ return self();
+ }
+
/**
* @return the configured default outcome as a {@link DeliveryState}
instance.
*/
@@ -139,4 +167,54 @@ public final class SourceOptions extends
TerminusOptions<SourceOptions> implemen
SourceOptions self() {
return this;
}
+
+ private static class FilterDescribedType implements DescribedType {
+
+ private final Object descriptor;
+ private final Object described;
+
+ public FilterDescribedType(Object descriptor, Object described) {
+ this.descriptor = descriptor;
+ this.described = described;
+ }
+
+ @Override
+ public Object getDescriptor() {
+ return descriptor;
+ }
+
+ @Override
+ public Object getDescribed() {
+ return this.described;
+ }
+
+ @Override
+ public String toString() {
+ return "FilterDescribedType{ descriptor:" + descriptor + ",
described:" + described + " }";
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(described, descriptor);
+ }
+
+ @Override
+ public boolean equals(Object target) {
+ if (this == target) {
+ return true;
+ }
+
+ if (target == null) {
+ return false;
+ }
+
+ if (!(target instanceof DescribedType)) {
+ return false;
+ }
+
+ final DescribedType other = (DescribedType) target;
+
+ return Objects.equals(descriptor, other.getDescriptor()) &&
Objects.equals(described, other.getDescribed());
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index e630f484..d64ef8df 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2736,6 +2736,74 @@ public class ReceiverTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorUsingSourceOptionsAddFilter()
throws Exception {
+ final PeerJmsSelectorType peerJmsSelector = new
PeerJmsSelectorType("myProperty=42");
+ final Map<String, Object> filtersAtPeer = new HashMap<>();
+ filtersAtPeer.put("jms-selector", peerJmsSelector);
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver()
+ .withSource().withAddress("test-queue")
+ .withDistributionMode("copy")
+ .withTimeout(128)
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+ .withDefaultOutcome(new Released())
+ .withCapabilities("QUEUE")
+ .withFilter(filtersAtPeer)
+
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+ .also()
+ .withTarget().withAddress(notNullValue())
+ .withCapabilities("QUEUE")
+
.withDurable(TerminusDurability.CONFIGURATION)
+
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+ .withTimeout(42)
+ .withDynamic(anyOf(nullValue(),
equalTo(false)))
+
.withDynamicNodeProperties(nullValue())
+ .and().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession();
+ ReceiverOptions receiverOptions = new ReceiverOptions();
+
+ receiverOptions.sourceOptions().capabilities("QUEUE");
+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+ receiverOptions.sourceOptions().timeout(128);
+
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+ receiverOptions.sourceOptions().addFilter("jms-selector",
UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
+
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED,
DeliveryState.Type.REJECTED);
+
+ receiverOptions.targetOptions().capabilities("QUEUE");
+
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+ receiverOptions.targetOptions().timeout(42);
+
+ Receiver receiver = session.openReceiver("test-queue",
receiverOptions).openFuture().get();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
public void
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilter()
throws Exception {
final DescribedType clientJmsSelector = new
AmqpJmsSelectorType("myProperty=42");
@@ -2813,6 +2881,77 @@ public class ReceiverTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void
testCreateReceiverWithUserConfiguredSourceWithJMSStyleSelectorAndNoLocalFilterUsingAddFilter()
throws Exception {
+ final PeerJmsSelectorType peerJmsSelector = new
PeerJmsSelectorType("myProperty=42");
+ final PeerNoLocalFilterType peerNoLocalFilter = new
PeerNoLocalFilterType();
+ final Map<String, Object> filtersAtPeer = new HashMap<>();
+ filtersAtPeer.put("jms-selector", peerJmsSelector);
+ filtersAtPeer.put("no-local", peerNoLocalFilter);
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver()
+ .withSource().withAddress("test-queue")
+ .withDistributionMode("copy")
+ .withTimeout(128)
+
.withDurable(TerminusDurability.UNSETTLED_STATE)
+
.withExpiryPolicy(TerminusExpiryPolicy.CONNECTION_CLOSE)
+ .withDefaultOutcome(new Released())
+ .withCapabilities("QUEUE")
+ .withFilter(filtersAtPeer)
+
.withOutcomes("amqp:accepted:list", "amqp:rejected:list")
+ .also()
+ .withTarget().withAddress(notNullValue())
+ .withCapabilities("QUEUE")
+
.withDurable(TerminusDurability.CONFIGURATION)
+
.withExpiryPolicy(TerminusExpiryPolicy.SESSION_END)
+ .withTimeout(42)
+ .withDynamic(anyOf(nullValue(),
equalTo(false)))
+
.withDynamicNodeProperties(nullValue())
+ .and().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession();
+ ReceiverOptions receiverOptions = new ReceiverOptions();
+
+ receiverOptions.sourceOptions().capabilities("QUEUE");
+
receiverOptions.sourceOptions().distributionMode(DistributionMode.COPY);
+ receiverOptions.sourceOptions().timeout(128);
+
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.UNSETTLED_STATE);
+
receiverOptions.sourceOptions().expiryPolicy(ExpiryPolicy.CONNECTION_CLOSE);
+
receiverOptions.sourceOptions().defaultOutcome(DeliveryState.released());
+ receiverOptions.sourceOptions().addFilter("jms-selector",
UnsignedLong.valueOf(0x0000468C00000004L), "myProperty=42");
+ receiverOptions.sourceOptions().addFilter("no-local",
UnsignedLong.valueOf(0x0000468C00000003L), "NoLocalFilter{}");
+
receiverOptions.sourceOptions().outcomes(DeliveryState.Type.ACCEPTED,
DeliveryState.Type.REJECTED);
+
+ receiverOptions.targetOptions().capabilities("QUEUE");
+
receiverOptions.targetOptions().durabilityMode(DurabilityMode.CONFIGURATION);
+
receiverOptions.targetOptions().expiryPolicy(ExpiryPolicy.SESSION_CLOSE);
+ receiverOptions.targetOptions().timeout(42);
+
+ Receiver receiver = session.openReceiver("test-queue",
receiverOptions).openFuture().get();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
public void testOpenDurableReceiver() throws Exception {
final String address = "test-topic";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]