This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.5.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit bb007ff994a557e2c257ad80f8033a068c18772b
Author: Andriy Redko <drr...@gmail.com>
AuthorDate: Thu Oct 26 21:27:10 2023 -0400

    [JAX-WS] The client out interceptor chain is not called in case of oneway 
operation (#1472)
    
    * [JAX-WS] The client out interceptor chain is not called in case of oneway 
operation
    
    * Remove setting thread local bus
    
    (cherry picked from commit 529f86b93feab01fd15de4593111c75fe58ab52d)
    
    # Conflicts:
    #       
integration/tracing/tracing-micrometer/src/main/java/org/apache/cxf/tracing/micrometer/AbstractObservationClientInterceptor.java
    (cherry picked from commit cc02e858026cbff95415f56e2a9e690bfb032151)
---
 .../java/org/apache/cxf/endpoint/ClientImpl.java   | 117 ++++++++++++++++++++-
 .../apache/cxf/interceptor/OneWayInterceptor.java  |  29 +++++
 .../brave/AbstractBraveClientInterceptor.java      |   3 +-
 .../AbstractOpenTelemetryClientInterceptor.java    |   3 +-
 .../AbstractOpenTracingClientInterceptor.java      |   3 +-
 5 files changed, 149 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java 
b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
index 53d3a432b8..2bee3982ae 100644
--- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
+++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import javax.xml.namespace.QName;
 import javax.xml.ws.handler.MessageContext;
@@ -55,6 +56,7 @@ import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.interceptor.InterceptorChain;
 import org.apache.cxf.interceptor.InterceptorProvider;
+import org.apache.cxf.interceptor.OneWayInterceptor;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
 import org.apache.cxf.message.Message;
@@ -622,9 +624,14 @@ public class ClientImpl
             }
         }
 
-        // Wait for a response if we need to
-        if (oi != null && !oi.getOperationInfo().isOneWay()) {
-            waitResponse(exchange);
+        if (oi != null) { 
+            if (!oi.getOperationInfo().isOneWay()) {
+                // Wait for a response if we need to
+                waitResponse(exchange);
+            } else {
+                // Trigger the interceptors chain
+                onewayOnly(exchange);
+            }
         }
 
         // leave the input stream open for the caller
@@ -672,6 +679,92 @@ public class ClientImpl
 
         return null;
     }
+
+    protected void onewayOnly(Exchange exhange) {
+        if (bus == null) {
+            throw new IllegalStateException("Message received on a Client that 
has been closed or destroyed.");
+        }
+
+        Message original = exhange.getInMessage();
+        if (original == null) {
+            // This is one way call and we may not even have incoming message
+            original = exhange.getOutMessage();
+        }
+
+        if (original == null) {
+            original = new MessageImpl();
+        }
+
+        Endpoint endpoint = exhange.getEndpoint();
+        if (endpoint == null) {
+            // in this case correlation will occur outside the transport,
+            // however there's a possibility that the endpoint may have been
+            // rebased in the meantime, so that the response will be mediated
+            // via a set of in interceptors provided by a *different* endpoint
+            //
+            endpoint = getConduitSelector().getEndpoint();
+            original.getExchange().put(Endpoint.class, endpoint);
+        }
+
+        final Message message = endpoint.getBinding().createMessage(original);
+        message.getExchange().setInMessage(message);
+        message.put(Message.REQUESTOR_ROLE, Boolean.TRUE);
+        message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
+        PhaseManager pm = bus.getExtension(PhaseManager.class);
+
+        List<Interceptor<? extends Message>> i1 = 
filterOneway(bus.getInInterceptors());
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Oneway interceptors contributed by bus: " + i1);
+        }
+        List<Interceptor<? extends Message>> i2 = 
filterOneway(getInInterceptors());
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Oneway interceptors contributed by client: " + i2);
+        }
+        List<Interceptor<? extends Message>> i3 = 
filterOneway(endpoint.getInInterceptors());
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Oneway interceptors contributed by endpoint: " + i3);
+        }
+        List<Interceptor<? extends Message>> i4 = 
filterOneway(endpoint.getBinding().getInInterceptors());
+        if (LOG.isLoggable(Level.FINE)) {
+            LOG.fine("Oneway interceptors contributed by binding: " + i4);
+        }
+
+        PhaseInterceptorChain chain;
+        if (endpoint.getService().getDataBinding() instanceof 
InterceptorProvider) {
+            InterceptorProvider p = 
(InterceptorProvider)endpoint.getService().getDataBinding();
+            List<Interceptor<? extends Message>> interceptors = 
filterOneway(p.getInInterceptors());
+            if (LOG.isLoggable(Level.FINE)) {
+                LOG.fine("Oneway interceptors contributed by databinding: " + 
interceptors);
+            }
+            chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4, 
interceptors);
+        } else {
+            chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4);
+        }
+        message.setInterceptorChain(chain);
+
+        modifyOnewayChain(chain, message);
+        modifyOnewayChain(chain, message.getExchange().getOutMessage());
+
+        final String startingAfterInterceptorID = (String) message.get(
+            InterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
+        final String startingInterceptorID = (String) message.get(
+            InterceptorChain.STARTING_AT_INTERCEPTOR_ID);
+        if (startingAfterInterceptorID != null) {
+            chain.doInterceptStartingAfter(message, 
startingAfterInterceptorID);
+        } else if (startingInterceptorID != null) {
+            chain.doInterceptStartingAt(message, startingInterceptorID);
+        } else {
+            chain.doIntercept(message);
+        }
+    }
+    
+    private List<Interceptor<? extends Message>> 
filterOneway(List<Interceptor<? extends Message>> interceptors) {
+        return interceptors
+            .stream()
+            .filter(OneWayInterceptor.class::isInstance)
+            .collect(Collectors.toList());
+    }
+
     protected Exception getException(Exchange exchange) {
         if (exchange.getInFaultMessage() != null) {
             return exchange.getInFaultMessage().getContent(Exception.class);
@@ -1015,6 +1108,24 @@ public class ClientImpl
         }
     }
 
+    protected void modifyOnewayChain(InterceptorChain chain, Message ctx) {
+        if (ctx == null) {
+            return;
+        }
+        Collection<InterceptorProvider> providers
+            = 
CastUtils.cast((Collection<?>)ctx.get(Message.INTERCEPTOR_PROVIDERS));
+        if (providers != null) {
+            for (InterceptorProvider p : providers) {
+                chain.add(filterOneway(p.getInInterceptors()));
+            }
+        }
+        Collection<Interceptor<? extends Message>> is
+            = CastUtils.cast((Collection<?>)ctx.get(Message.IN_INTERCEPTORS));
+        if (is != null) {
+            chain.add(is);
+        }
+    }
+
     protected void setEndpoint(Endpoint e) {
         getConduitSelector().setEndpoint(e);
     }
diff --git 
a/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java 
b/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java
new file mode 100644
index 0000000000..2513b06f5e
--- /dev/null
+++ b/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.interceptor;
+
+import org.apache.cxf.message.Message;
+
+/**
+ * Base interface for client interceptors that are compatible with one way 
+ * message processing (primarily, JAX-WS).
+ */
+public interface OneWayInterceptor<T extends Message> extends Interceptor<T> {
+}
diff --git 
a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java
 
b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java
index f785c72536..6bc37bd60a 100644
--- 
a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java
+++ 
b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java
@@ -23,11 +23,12 @@ import java.util.Collections;
 import java.util.Set;
 
 import brave.http.HttpTracing;
+import org.apache.cxf.interceptor.OneWayInterceptor;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.PhaseInterceptor;
 
 public abstract class AbstractBraveClientInterceptor extends 
AbstractBraveClientProvider
-        implements PhaseInterceptor<Message> {
+        implements PhaseInterceptor<Message>, OneWayInterceptor<Message> {
 
     private String phase;
 
diff --git 
a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java
 
b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java
index 28eb3431f4..0363fd259f 100644
--- 
a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java
+++ 
b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 
+import org.apache.cxf.interceptor.OneWayInterceptor;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.PhaseInterceptor;
 
@@ -29,7 +30,7 @@ import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.trace.Tracer;
 
 public abstract class AbstractOpenTelemetryClientInterceptor extends 
AbstractOpenTelemetryClientProvider
-    implements PhaseInterceptor<Message> {
+    implements PhaseInterceptor<Message>, OneWayInterceptor<Message> {
 
     private String phase;
 
diff --git 
a/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java
 
b/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java
index e758b3e799..12fe25c417 100644
--- 
a/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java
+++ 
b/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java
@@ -22,13 +22,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
 
+import org.apache.cxf.interceptor.OneWayInterceptor;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.PhaseInterceptor;
 
 import io.opentracing.Tracer;
 
 public abstract class AbstractOpenTracingClientInterceptor extends 
AbstractOpenTracingClientProvider
-        implements PhaseInterceptor<Message> {
+        implements PhaseInterceptor<Message>, OneWayInterceptor<Message> {
 
     private String phase;
 

Reply via email to