This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 11426b3a PROTON-2599 Fix race that was causing some intermittent test failures 11426b3a is described below commit 11426b3ab3f182243bf627115da6f831ea4cca12 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Thu Sep 8 18:26:53 2022 -0400 PROTON-2599 Fix race that was causing some intermittent test failures --- .../qpid/protonj2/client/impl/ClientTrackable.java | 24 +++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java index 1bf4a4b8..61615852 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java @@ -20,6 +20,8 @@ package org.apache.qpid.protonj2.client.impl; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.qpid.protonj2.client.DeliveryState; import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException; @@ -30,14 +32,24 @@ import org.apache.qpid.protonj2.engine.OutgoingDelivery; /** * Base type used to provide some common plumbing for Tracker types + * + * @param <SenderType> The client sender type that created this tracker + * @param <TrackerType> The actual type of tracker that is being implemented */ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> { protected final SenderType sender; protected final OutgoingDelivery delivery; + @SuppressWarnings("rawtypes") + protected static final AtomicIntegerFieldUpdater<ClientTrackable> REMOTELY_SETTLED_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ClientTrackable.class, "remotelySettled"); + @SuppressWarnings("rawtypes") + protected static final AtomicReferenceFieldUpdater<ClientTrackable, DeliveryState> REMOTEL_DELIVERY_STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ClientTrackable.class, DeliveryState.class, "remoteDeliveryState"); + private ClientFuture<TrackerType> remoteSettlementFuture; - private volatile boolean remotelySettled; + private volatile int remotelySettled; private volatile DeliveryState remoteDeliveryState; /** @@ -69,7 +81,7 @@ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?> } public boolean remoteSettled() { - return remotelySettled; + return remotelySettled > 0; } public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException { @@ -199,10 +211,10 @@ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?> //----- Internal Event hooks for delivery updates private void processDeliveryUpdated(OutgoingDelivery delivery) { - remotelySettled = delivery.isRemotelySettled(); - remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState()); - if (delivery.isRemotelySettled()) { + REMOTELY_SETTLED_UPDATER.lazySet(this, 1); + REMOTEL_DELIVERY_STATE_UPDATER.lazySet(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState())); + if (sender.options.autoSettle()) { delivery.settle(); } @@ -212,6 +224,8 @@ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?> remoteSettlementFuture.complete(self()); } } + } else { + REMOTEL_DELIVERY_STATE_UPDATER.set(this, ClientDeliveryState.fromProtonType(delivery.getRemoteState())); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org