This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 2d3bbaf5 PROTON-2541 Further refine the internal client implementation 2d3bbaf5 is described below commit 2d3bbaf5ad300afc0b3c6f25d9ca0dfcdc905578 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Wed May 11 17:42:29 2022 -0400 PROTON-2541 Further refine the internal client implementation Provide some additional abstractions internally to reduce duplicated code amongst tracker types and sender and receiver link implementations. --- .../protonj2/client/impl/ClientConnection.java | 2 +- .../client/impl/ClientConnectionCapabilities.java | 2 +- .../client/impl/ClientExceptionSupport.java | 2 +- .../client/impl/ClientNoOpStreamTracker.java | 2 +- .../protonj2/client/impl/ClientNoOpTracker.java | 2 +- .../qpid/protonj2/client/impl/ClientReceiver.java | 16 +- .../client/impl/ClientReceiverLinkType.java | 60 +++++++ .../qpid/protonj2/client/impl/ClientSender.java | 25 +-- .../protonj2/client/impl/ClientSenderLinkType.java | 43 ++++- .../protonj2/client/impl/ClientStreamReceiver.java | 14 +- .../protonj2/client/impl/ClientStreamSender.java | 24 +-- .../protonj2/client/impl/ClientStreamTracker.java | 171 +------------------- .../{ClientTracker.java => ClientTrackable.java} | 73 ++++----- .../qpid/protonj2/client/impl/ClientTracker.java | 174 +-------------------- .../client/impl/ClientTransactionContext.java | 2 +- .../protonj2/client/impl/StreamSenderTest.java | 4 +- 16 files changed, 157 insertions(+), 459 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java index 9715ecf6..c6765dfc 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java @@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory; /** * A {@link Connection} implementation that uses the Proton engine for AMQP protocol support. */ -public class ClientConnection implements Connection { +public final class ClientConnection implements Connection { private static final Logger LOG = LoggerFactory.getLogger(ClientConnection.class); diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java index 3570edbd..df35d023 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java @@ -27,7 +27,7 @@ import org.apache.qpid.protonj2.types.Symbol; * Tracks available known capabilities for the connection to allow the client * to know what features are supported on the current connection. */ -public class ClientConnectionCapabilities { +public final class ClientConnectionCapabilities { private boolean anonymousRelaySupported; private boolean delayedDeliverySupported; diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java index 60756c11..94c76cd1 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java @@ -42,7 +42,7 @@ import org.apache.qpid.protonj2.types.transport.ConnectionError; import org.apache.qpid.protonj2.types.transport.ErrorCondition; import org.apache.qpid.protonj2.types.transport.LinkError; -class ClientExceptionSupport { +final class ClientExceptionSupport { /** * Checks the given cause to determine if it's already an ClientIOException type and diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java index 493f475f..c7417c36 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java @@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory; * A dummy Tracker instance that always indicates remote settlement and * acceptance for {@link StreamSender} instances. */ -public class ClientNoOpStreamTracker implements StreamTracker { +public final class ClientNoOpStreamTracker implements StreamTracker { private final ClientStreamSender sender; diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java index 0d942ed1..b4d5c2e0 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java @@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory; * A dummy Tracker instance that always indicates remote settlement and * acceptance. */ -public class ClientNoOpTracker implements Tracker { +public final class ClientNoOpTracker implements Tracker { private final ClientSender sender; diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java index b13e1fd3..e025d35b 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java @@ -17,7 +17,6 @@ package org.apache.qpid.protonj2.client.impl; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory; /** * Client {@link Receiver} implementation. */ -public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qpid.protonj2.engine.Receiver> implements Receiver { +public final class ClientReceiver extends ClientReceiverLinkType<Receiver> implements Receiver { private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class); @@ -48,17 +47,12 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp private ScheduledFuture<?> drainingTimeout; private final ReceiverOptions options; - private final ScheduledExecutorService executor; private final FifoDeliveryQueue messageQueue; - private org.apache.qpid.protonj2.engine.Receiver protonReceiver; - ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) { - super(session, receiverId, options); + super(session, receiverId, options, receiver); this.options = options; - this.executor = session.getScheduler(); - this.protonReceiver = receiver.setLinkedResource(this); if (options.creditWindow() > 0) { protonReceiver.addCredit(options.creditWindow()); @@ -177,6 +171,7 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp //----- Internal API for the ClientReceiver and other Client objects + @Override void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException { checkClosedOrFailed(); asyncApplyDisposition(delivery, state, settle); @@ -192,11 +187,6 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp return this; } - @Override - protected org.apache.qpid.protonj2.engine.Receiver protonLink() { - return protonReceiver; - } - //----- Handlers for proton receiver events private void handleDeliveryReceived(IncomingDelivery delivery) { diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java new file mode 100644 index 00000000..a2a00fa7 --- /dev/null +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.protonj2.client.impl; + +import org.apache.qpid.protonj2.client.Link; +import org.apache.qpid.protonj2.client.LinkOptions; +import org.apache.qpid.protonj2.client.exceptions.ClientException; +import org.apache.qpid.protonj2.engine.IncomingDelivery; +import org.apache.qpid.protonj2.engine.Receiver; +import org.apache.qpid.protonj2.types.transport.DeliveryState; + +/** + * Base class for client link types that wrap a proton receiver to provide + * delivery dispatch in some manner. + */ +public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> { + + protected Receiver protonReceiver; + + protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) { + super(session, linkId, options); + + this.protonReceiver = protonReceiver; + } + + @Override + protected org.apache.qpid.protonj2.engine.Receiver protonLink() { + return protonReceiver; + } + + /** + * Apply the given disposition and settlement state to the given incoming delivery instance. + * + * @param delivery + * The incoming delivery that will be acted upon + * @param state + * The delivery state to apply to the given incoming delivery + * @param settle + * The settlement state to apply to the given incoming delivery + * + * @throws ClientException if an error occurs while applying the disposition to the delivery. + */ + abstract void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException; + +} diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java index f449d400..7ff4849b 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java @@ -42,22 +42,17 @@ import org.slf4j.LoggerFactory; /** * Proton based AMQP Sender */ -final class ClientSender extends ClientSenderLinkType<Sender> implements Sender { +public final class ClientSender extends ClientSenderLinkType<Sender> implements Sender { private static final Logger LOG = LoggerFactory.getLogger(ClientSender.class); - private final boolean sendsSettled; private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>(); private final SenderOptions options; - private org.apache.qpid.protonj2.engine.Sender protonSender; - ClientSender(ClientSession session, SenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) { - super(session, senderId, options); + super(session, senderId, options, protonSender); this.options = new SenderOptions(options); - this.protonSender = protonSender.setLinkedResource(this); - this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED; } @Override @@ -90,6 +85,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender return this.options; } + @Override void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException { checkClosedOrFailed(); executor.execute(() -> { @@ -97,14 +93,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender }); } - org.apache.qpid.protonj2.engine.Sender getProtonSender() { - return protonSender; - } - - boolean isSendingSettled() { - return sendsSettled; - } - //----- Handlers for proton receiver events private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) { @@ -141,11 +129,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender return this; } - @Override - protected org.apache.qpid.protonj2.engine.Sender protonLink() { - return protonSender; - } - private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { @@ -352,7 +335,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender @Override public void send(DeliveryState state, boolean settled) { if (delivery == null) { - delivery = sender.getProtonSender().next(); + delivery = sender.protonLink().next(); delivery.setLinkedResource(sender.createTracker(delivery)); } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java index afc1ef51..c1edd783 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java @@ -19,9 +19,13 @@ package org.apache.qpid.protonj2.client.impl; import org.apache.qpid.protonj2.client.Link; import org.apache.qpid.protonj2.client.LinkOptions; +import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException; import org.apache.qpid.protonj2.engine.LinkState; +import org.apache.qpid.protonj2.engine.OutgoingDelivery; import org.apache.qpid.protonj2.engine.Sender; +import org.apache.qpid.protonj2.types.transport.DeliveryState; +import org.apache.qpid.protonj2.types.transport.SenderSettleMode; /** * Base type for all the proton client sender types which provides a few extra @@ -29,17 +33,50 @@ import org.apache.qpid.protonj2.engine.Sender; */ public abstract class ClientSenderLinkType<LinkType extends Link<LinkType>> extends ClientLinkType<LinkType, Sender> { - protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options) { + private final boolean sendsSettled; + + protected Sender protonSender; + + protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options, Sender protonSender) { super(session, linkId, options); + + this.protonSender = protonSender; + this.protonSender = protonSender.setLinkedResource(self()); + this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED; } final boolean isAnonymous() { - return protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null; + return protonSender.<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null; + } + + final boolean isSendingSettled() { + return sendsSettled; + } + + @Override + final protected Sender protonLink() { + return protonSender; } final void handleAnonymousRelayNotSupported() { - if (isAnonymous() && protonLink().getState() == LinkState.IDLE) { + if (isAnonymous() && protonSender.getState() == LinkState.IDLE) { immediateLinkShutdown(new ClientUnsupportedOperationException("Anonymous relay support not available from this connection")); } } + + /** + * Provides a common API point for sender link types to allow resources to process and then + * apply dispositions to outgoing delivery types. + * + * @param delivery + * The outgoing delivery to apply the disposition to + * @param state + * The delivery state to apply to the outgoing delivery. + * @param settled + * Should the outgoing delivery be settled as part of the disposition. + * + * @throws ClientException if an error occurs while applying the dispostion. + */ + abstract void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException; + } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java index 8b249f9c..9ab35c81 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java @@ -33,7 +33,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutExcepti import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException; import org.apache.qpid.protonj2.client.futures.ClientFuture; import org.apache.qpid.protonj2.engine.IncomingDelivery; -import org.apache.qpid.protonj2.engine.Receiver; import org.apache.qpid.protonj2.types.messaging.Released; import org.apache.qpid.protonj2.types.transport.DeliveryState; import org.slf4j.Logger; @@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory; /** * Client implementation of a {@link StreamReceiver}. */ -public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, org.apache.qpid.protonj2.engine.Receiver> implements StreamReceiver { +public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver { private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class); @@ -51,13 +50,10 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o private final StreamReceiverOptions options; private final Map<ClientFuture<StreamDelivery>, ScheduledFuture<?>> receiveRequests = new LinkedHashMap<>(); - private org.apache.qpid.protonj2.engine.Receiver protonReceiver; - ClientStreamReceiver(ClientSession session, StreamReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) { - super(session, receiverId, options); + super(session, receiverId, options, receiver); this.options = options; - this.protonReceiver = receiver.setLinkedResource(this); if (options.creditWindow() > 0) { protonReceiver.addCredit(options.creditWindow()); @@ -259,6 +255,7 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o //----- Private implementation details + @Override void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException { checkClosedOrFailed(); asyncApplyDisposition(delivery, state, settle); @@ -341,11 +338,6 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o return this; } - @Override - protected Receiver protonLink() { - return protonReceiver; - } - @Override protected void linkSpecificLocalOpenHandler() { protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java index 2513098e..511f9028 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java @@ -36,7 +36,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException; import org.apache.qpid.protonj2.client.futures.ClientFuture; import org.apache.qpid.protonj2.client.futures.ClientSynchronization; import org.apache.qpid.protonj2.engine.OutgoingDelivery; -import org.apache.qpid.protonj2.engine.Sender; import org.apache.qpid.protonj2.engine.util.StringUtils; import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations; import org.apache.qpid.protonj2.types.transport.DeliveryState; @@ -47,22 +46,17 @@ import org.slf4j.LoggerFactory; /** * Client implementation of a {@link StreamSender}. */ -public final class ClientStreamSender extends ClientLinkType<StreamSender, Sender> implements StreamSender { +public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> implements StreamSender { private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class); private final StreamSenderOptions options; - private final boolean sendsSettled; private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>(); - private org.apache.qpid.protonj2.engine.Sender protonSender; - ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) { - super(session, senderId, options); + super(session, senderId, options, protonSender); this.options = new StreamSenderOptions(options); - this.protonSender = protonSender.setLinkedResource(this); - this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED; } @Override @@ -127,10 +121,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende //----- Internal API - boolean isSendingSettled() { - return sendsSettled; - } - StreamSenderOptions options() { return this.options; } @@ -140,15 +130,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende return this; } - Sender getProtonSender() { - return protonSender; - } - - @Override - protected Sender protonLink() { - return protonSender; - } - private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { @@ -227,6 +208,7 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende return new ClientNoOpStreamTracker(this); } + @Override void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException { checkClosedOrFailed(); executor.execute(() -> { diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java index 0d5c3736..f6b17fbc 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java @@ -16,42 +16,18 @@ */ package org.apache.qpid.protonj2.client.impl; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.qpid.protonj2.client.DeliveryState; import org.apache.qpid.protonj2.client.StreamSender; import org.apache.qpid.protonj2.client.StreamTracker; -import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException; -import org.apache.qpid.protonj2.client.exceptions.ClientException; -import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException; -import org.apache.qpid.protonj2.client.futures.ClientFuture; import org.apache.qpid.protonj2.engine.OutgoingDelivery; /** * {@link StreamTracker} implementation that relies on the ClientTracker to handle the * basic {@link OutgoingDelivery} management. */ -public final class ClientStreamTracker implements StreamTracker { - - private final ClientStreamSender sender; - private final OutgoingDelivery delivery; - - private final ClientFuture<StreamTracker> remoteSettlementFuture; - - private volatile boolean remotelySettled; - private volatile DeliveryState remoteDeliveryState; +public final class ClientStreamTracker extends ClientTrackable<ClientStreamSender, StreamTracker> implements StreamTracker { ClientStreamTracker(ClientStreamSender sender, OutgoingDelivery delivery) { - this.sender = sender; - this.delivery = delivery; - this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated); - this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture(); - } - - OutgoingDelivery delivery() { - return delivery; + super(sender, delivery); } @Override @@ -60,148 +36,7 @@ public final class ClientStreamTracker implements StreamTracker { } @Override - public synchronized DeliveryState state() { - return ClientDeliveryState.fromProtonType(delivery.getState()); - } - - @Override - public DeliveryState remoteState() { - return remoteDeliveryState; - } - - @Override - public boolean remoteSettled() { - return remotelySettled; - } - - @Override - public StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException { - try { - sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle); - } finally { - if (settle) { - remoteSettlementFuture.complete(this); - } - } - + protected StreamTracker self() { return this; } - - @Override - public StreamTracker settle() throws ClientException { - try { - sender.disposition(delivery, null, true); - } finally { - remoteSettlementFuture.complete(this); - } - - return this; - } - - @Override - public synchronized boolean settled() { - return delivery.isSettled(); - } - - @Override - public ClientFuture<StreamTracker> settlementFuture() { - if (delivery.isSettled()) { - remoteSettlementFuture.complete(this); - } - - return remoteSettlementFuture; - } - - @Override - public StreamTracker awaitSettlement() throws ClientException { - try { - if (settled()) { - return this; - } else { - return settlementFuture().get(); - } - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new ClientException("Wait for settlement was interrupted", e); - } - } - - @Override - public StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException { - try { - if (settled()) { - return this; - } else { - return settlementFuture().get(timeout, unit); - } - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for settlement was interrupted", ie); - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (TimeoutException te) { - throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te); - } - } - - @Override - public StreamTracker awaitAccepted() throws ClientException { - try { - if (settled() && !remoteSettled()) { - return this; - } else { - settlementFuture().get(); - if (remoteState() != null && remoteState().isAccepted()) { - return this; - } else { - throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); - } - } - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for Accepted outcome was interrupted", ie); - } - } - - @Override - public StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException { - try { - if (settled() && !remoteSettled()) { - return this; - } else { - settlementFuture().get(timeout, unit); - if (remoteState() != null && remoteState().isAccepted()) { - return this; - } else { - throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); - } - } - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for Accepted outcome was interrupted", ie); - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (TimeoutException te) { - throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te); - } - } - - //----- Internal Event hooks for delivery updates - - private void processDeliveryUpdated(OutgoingDelivery delivery) { - remotelySettled = delivery.isRemotelySettled(); - remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState()); - - if (delivery.isRemotelySettled()) { - remoteSettlementFuture.complete(this); - } - - if (sender.options().autoSettle() && delivery.isRemotelySettled()) { - delivery.settle(); - } - } } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java similarity index 78% copy from protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java copy to protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java index 862bf6a4..0fb08837 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.qpid.protonj2.client.impl; import java.util.concurrent.ExecutionException; @@ -21,8 +22,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.qpid.protonj2.client.DeliveryState; -import org.apache.qpid.protonj2.client.Sender; -import org.apache.qpid.protonj2.client.Tracker; import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException; import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException; @@ -30,14 +29,14 @@ import org.apache.qpid.protonj2.client.futures.ClientFuture; import org.apache.qpid.protonj2.engine.OutgoingDelivery; /** - * Client outgoing delivery tracker object. + * Base type used to provide some common plumbing for Tracker types */ -class ClientTracker implements Tracker { +public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> { - private final ClientSender sender; - private final OutgoingDelivery delivery; + protected final SenderType sender; + protected final OutgoingDelivery delivery; - private final ClientFuture<Tracker> remoteSettlementFuture; + private final ClientFuture<TrackerType> remoteSettlementFuture; private volatile boolean remotelySettled; private volatile DeliveryState remoteDeliveryState; @@ -50,80 +49,69 @@ class ClientTracker implements Tracker { * @param delivery * The proton outgoing delivery object that backs this tracker. */ - ClientTracker(ClientSender sender, OutgoingDelivery delivery) { + ClientTrackable(SenderType sender, OutgoingDelivery delivery) { this.sender = sender; this.delivery = delivery; this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated); this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture(); } + protected abstract TrackerType self(); + OutgoingDelivery delivery() { return delivery; } - @Override - public Sender sender() { - return sender; - } - - @Override public synchronized DeliveryState state() { return ClientDeliveryState.fromProtonType(delivery.getState()); } - @Override public DeliveryState remoteState() { return remoteDeliveryState; } - @Override public boolean remoteSettled() { return remotelySettled; } - @Override - public Tracker disposition(DeliveryState state, boolean settle) throws ClientException { + public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException { try { sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle); } finally { if (settle) { - remoteSettlementFuture.complete(this); + remoteSettlementFuture.complete(self()); } } - return this; + return self(); } - @Override - public Tracker settle() throws ClientException { + public TrackerType settle() throws ClientException { try { sender.disposition(delivery, null, true); } finally { - remoteSettlementFuture.complete(this); + remoteSettlementFuture.complete(self()); } - return this; + return self(); } - @Override public synchronized boolean settled() { return delivery.isSettled(); } - @Override - public ClientFuture<Tracker> settlementFuture() { + public ClientFuture<TrackerType> settlementFuture() { if (delivery.isSettled()) { - remoteSettlementFuture.complete(this); + remoteSettlementFuture.complete(self()); } return remoteSettlementFuture; } - @Override - public Tracker awaitSettlement() throws ClientException { + public TrackerType awaitSettlement() throws ClientException { try { if (settled()) { - return this; + return self(); } else { return settlementFuture().get(); } @@ -135,11 +123,10 @@ class ClientTracker implements Tracker { } } - @Override - public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException { + public TrackerType awaitSettlement(long timeout, TimeUnit unit) throws ClientException { try { if (settled()) { - return this; + return self(); } else { return settlementFuture().get(timeout, unit); } @@ -153,15 +140,14 @@ class ClientTracker implements Tracker { } } - @Override - public Tracker awaitAccepted() throws ClientException { + public TrackerType awaitAccepted() throws ClientException { try { if (settled() && !remoteSettled()) { - return this; + return self(); } else { settlementFuture().get(); if (remoteState() != null && remoteState().isAccepted()) { - return this; + return self(); } else { throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); } @@ -174,15 +160,14 @@ class ClientTracker implements Tracker { } } - @Override - public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException { + public TrackerType awaitAccepted(long timeout, TimeUnit unit) throws ClientException { try { if (settled() && !remoteSettled()) { - return this; + return self(); } else { settlementFuture().get(timeout, unit); if (remoteState() != null && remoteState().isAccepted()) { - return this; + return self(); } else { throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); } @@ -204,10 +189,10 @@ class ClientTracker implements Tracker { remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState()); if (delivery.isRemotelySettled()) { - remoteSettlementFuture.complete(this); + remoteSettlementFuture.complete(self()); } - if (sender.options().autoSettle() && delivery.isRemotelySettled()) { + if (sender.options.autoSettle() && delivery.isRemotelySettled()) { delivery.settle(); } } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java index 862bf6a4..23b9911f 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java @@ -16,31 +16,13 @@ */ package org.apache.qpid.protonj2.client.impl; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import org.apache.qpid.protonj2.client.DeliveryState; -import org.apache.qpid.protonj2.client.Sender; import org.apache.qpid.protonj2.client.Tracker; -import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException; -import org.apache.qpid.protonj2.client.exceptions.ClientException; -import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException; -import org.apache.qpid.protonj2.client.futures.ClientFuture; import org.apache.qpid.protonj2.engine.OutgoingDelivery; /** * Client outgoing delivery tracker object. */ -class ClientTracker implements Tracker { - - private final ClientSender sender; - private final OutgoingDelivery delivery; - - private final ClientFuture<Tracker> remoteSettlementFuture; - - private volatile boolean remotelySettled; - private volatile DeliveryState remoteDeliveryState; +public final class ClientTracker extends ClientTrackable<ClientSender, Tracker> implements Tracker { /** * Create an instance of a client outgoing delivery tracker. @@ -51,164 +33,16 @@ class ClientTracker implements Tracker { * The proton outgoing delivery object that backs this tracker. */ ClientTracker(ClientSender sender, OutgoingDelivery delivery) { - this.sender = sender; - this.delivery = delivery; - this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated); - this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture(); - } - - OutgoingDelivery delivery() { - return delivery; + super(sender, delivery); } @Override - public Sender sender() { + public ClientSender sender() { return sender; } @Override - public synchronized DeliveryState state() { - return ClientDeliveryState.fromProtonType(delivery.getState()); - } - - @Override - public DeliveryState remoteState() { - return remoteDeliveryState; - } - - @Override - public boolean remoteSettled() { - return remotelySettled; - } - - @Override - public Tracker disposition(DeliveryState state, boolean settle) throws ClientException { - try { - sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle); - } finally { - if (settle) { - remoteSettlementFuture.complete(this); - } - } - + protected Tracker self() { return this; } - - @Override - public Tracker settle() throws ClientException { - try { - sender.disposition(delivery, null, true); - } finally { - remoteSettlementFuture.complete(this); - } - - return this; - } - - @Override - public synchronized boolean settled() { - return delivery.isSettled(); - } - - @Override - public ClientFuture<Tracker> settlementFuture() { - if (delivery.isSettled()) { - remoteSettlementFuture.complete(this); - } - - return remoteSettlementFuture; - } - - @Override - public Tracker awaitSettlement() throws ClientException { - try { - if (settled()) { - return this; - } else { - return settlementFuture().get(); - } - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (InterruptedException e) { - Thread.interrupted(); - throw new ClientException("Wait for settlement was interrupted", e); - } - } - - @Override - public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException { - try { - if (settled()) { - return this; - } else { - return settlementFuture().get(timeout, unit); - } - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for settlement was interrupted", ie); - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (TimeoutException te) { - throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te); - } - } - - @Override - public Tracker awaitAccepted() throws ClientException { - try { - if (settled() && !remoteSettled()) { - return this; - } else { - settlementFuture().get(); - if (remoteState() != null && remoteState().isAccepted()) { - return this; - } else { - throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); - } - } - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for Accepted outcome was interrupted", ie); - } - } - - @Override - public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException { - try { - if (settled() && !remoteSettled()) { - return this; - } else { - settlementFuture().get(timeout, unit); - if (remoteState() != null && remoteState().isAccepted()) { - return this; - } else { - throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState()); - } - } - } catch (InterruptedException ie) { - Thread.interrupted(); - throw new ClientException("Wait for Accepted outcome was interrupted", ie); - } catch (ExecutionException exe) { - throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause()); - } catch (TimeoutException te) { - throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te); - } - } - - //----- Internal Event hooks for delivery updates - - private void processDeliveryUpdated(OutgoingDelivery delivery) { - remotelySettled = delivery.isRemotelySettled(); - remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState()); - - if (delivery.isRemotelySettled()) { - remoteSettlementFuture.complete(this); - } - - if (sender.options().autoSettle() && delivery.isRemotelySettled()) { - delivery.settle(); - } - } } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java index 0ca1bf38..6f58d242 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java @@ -28,7 +28,7 @@ import org.apache.qpid.protonj2.types.transport.DeliveryState; * to mask from the senders and receivers the work of deciding transaction * specific behaviors. */ -public interface ClientTransactionContext { +interface ClientTransactionContext { public interface Sendable { diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java index 338cc178..5d0b7a84 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java @@ -245,7 +245,7 @@ public class StreamSenderTest extends ImperativeClientTestCase { senderOptions.targetOptions().capabilities("queue"); ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions); - assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity()); + assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity()); sender.openFuture().get(); sender.close(); @@ -282,7 +282,7 @@ public class StreamSenderTest extends ImperativeClientTestCase { senderOptions.targetOptions().capabilities("queue"); ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions); - assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity()); + assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity()); sender.openFuture().get(); sender.close(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org