This is an automated email from the ASF dual-hosted git repository. reta pushed a commit to branch 3.5.x-fixes in repository https://gitbox.apache.org/repos/asf/cxf.git
commit bb007ff994a557e2c257ad80f8033a068c18772b Author: Andriy Redko <drr...@gmail.com> AuthorDate: Thu Oct 26 21:27:10 2023 -0400 [JAX-WS] The client out interceptor chain is not called in case of oneway operation (#1472) * [JAX-WS] The client out interceptor chain is not called in case of oneway operation * Remove setting thread local bus (cherry picked from commit 529f86b93feab01fd15de4593111c75fe58ab52d) # Conflicts: # integration/tracing/tracing-micrometer/src/main/java/org/apache/cxf/tracing/micrometer/AbstractObservationClientInterceptor.java (cherry picked from commit cc02e858026cbff95415f56e2a9e690bfb032151) --- .../java/org/apache/cxf/endpoint/ClientImpl.java | 117 ++++++++++++++++++++- .../apache/cxf/interceptor/OneWayInterceptor.java | 29 +++++ .../brave/AbstractBraveClientInterceptor.java | 3 +- .../AbstractOpenTelemetryClientInterceptor.java | 3 +- .../AbstractOpenTracingClientInterceptor.java | 3 +- 5 files changed, 149 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java index 53d3a432b8..2bee3982ae 100644 --- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java +++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import javax.xml.namespace.QName; import javax.xml.ws.handler.MessageContext; @@ -55,6 +56,7 @@ import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.interceptor.InterceptorChain; import org.apache.cxf.interceptor.InterceptorProvider; +import org.apache.cxf.interceptor.OneWayInterceptor; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.ExchangeImpl; import org.apache.cxf.message.Message; @@ -622,9 +624,14 @@ public class ClientImpl } } - // Wait for a response if we need to - if (oi != null && !oi.getOperationInfo().isOneWay()) { - waitResponse(exchange); + if (oi != null) { + if (!oi.getOperationInfo().isOneWay()) { + // Wait for a response if we need to + waitResponse(exchange); + } else { + // Trigger the interceptors chain + onewayOnly(exchange); + } } // leave the input stream open for the caller @@ -672,6 +679,92 @@ public class ClientImpl return null; } + + protected void onewayOnly(Exchange exhange) { + if (bus == null) { + throw new IllegalStateException("Message received on a Client that has been closed or destroyed."); + } + + Message original = exhange.getInMessage(); + if (original == null) { + // This is one way call and we may not even have incoming message + original = exhange.getOutMessage(); + } + + if (original == null) { + original = new MessageImpl(); + } + + Endpoint endpoint = exhange.getEndpoint(); + if (endpoint == null) { + // in this case correlation will occur outside the transport, + // however there's a possibility that the endpoint may have been + // rebased in the meantime, so that the response will be mediated + // via a set of in interceptors provided by a *different* endpoint + // + endpoint = getConduitSelector().getEndpoint(); + original.getExchange().put(Endpoint.class, endpoint); + } + + final Message message = endpoint.getBinding().createMessage(original); + message.getExchange().setInMessage(message); + message.put(Message.REQUESTOR_ROLE, Boolean.TRUE); + message.put(Message.INBOUND_MESSAGE, Boolean.TRUE); + PhaseManager pm = bus.getExtension(PhaseManager.class); + + List<Interceptor<? extends Message>> i1 = filterOneway(bus.getInInterceptors()); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Oneway interceptors contributed by bus: " + i1); + } + List<Interceptor<? extends Message>> i2 = filterOneway(getInInterceptors()); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Oneway interceptors contributed by client: " + i2); + } + List<Interceptor<? extends Message>> i3 = filterOneway(endpoint.getInInterceptors()); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Oneway interceptors contributed by endpoint: " + i3); + } + List<Interceptor<? extends Message>> i4 = filterOneway(endpoint.getBinding().getInInterceptors()); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Oneway interceptors contributed by binding: " + i4); + } + + PhaseInterceptorChain chain; + if (endpoint.getService().getDataBinding() instanceof InterceptorProvider) { + InterceptorProvider p = (InterceptorProvider)endpoint.getService().getDataBinding(); + List<Interceptor<? extends Message>> interceptors = filterOneway(p.getInInterceptors()); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Oneway interceptors contributed by databinding: " + interceptors); + } + chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4, interceptors); + } else { + chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4); + } + message.setInterceptorChain(chain); + + modifyOnewayChain(chain, message); + modifyOnewayChain(chain, message.getExchange().getOutMessage()); + + final String startingAfterInterceptorID = (String) message.get( + InterceptorChain.STARTING_AFTER_INTERCEPTOR_ID); + final String startingInterceptorID = (String) message.get( + InterceptorChain.STARTING_AT_INTERCEPTOR_ID); + if (startingAfterInterceptorID != null) { + chain.doInterceptStartingAfter(message, startingAfterInterceptorID); + } else if (startingInterceptorID != null) { + chain.doInterceptStartingAt(message, startingInterceptorID); + } else { + chain.doIntercept(message); + } + } + + private List<Interceptor<? extends Message>> filterOneway(List<Interceptor<? extends Message>> interceptors) { + return interceptors + .stream() + .filter(OneWayInterceptor.class::isInstance) + .collect(Collectors.toList()); + } + protected Exception getException(Exchange exchange) { if (exchange.getInFaultMessage() != null) { return exchange.getInFaultMessage().getContent(Exception.class); @@ -1015,6 +1108,24 @@ public class ClientImpl } } + protected void modifyOnewayChain(InterceptorChain chain, Message ctx) { + if (ctx == null) { + return; + } + Collection<InterceptorProvider> providers + = CastUtils.cast((Collection<?>)ctx.get(Message.INTERCEPTOR_PROVIDERS)); + if (providers != null) { + for (InterceptorProvider p : providers) { + chain.add(filterOneway(p.getInInterceptors())); + } + } + Collection<Interceptor<? extends Message>> is + = CastUtils.cast((Collection<?>)ctx.get(Message.IN_INTERCEPTORS)); + if (is != null) { + chain.add(is); + } + } + protected void setEndpoint(Endpoint e) { getConduitSelector().setEndpoint(e); } diff --git a/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java b/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java new file mode 100644 index 0000000000..2513b06f5e --- /dev/null +++ b/core/src/main/java/org/apache/cxf/interceptor/OneWayInterceptor.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.interceptor; + +import org.apache.cxf.message.Message; + +/** + * Base interface for client interceptors that are compatible with one way + * message processing (primarily, JAX-WS). + */ +public interface OneWayInterceptor<T extends Message> extends Interceptor<T> { +} diff --git a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java index f785c72536..6bc37bd60a 100644 --- a/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java +++ b/integration/tracing/tracing-brave/src/main/java/org/apache/cxf/tracing/brave/AbstractBraveClientInterceptor.java @@ -23,11 +23,12 @@ import java.util.Collections; import java.util.Set; import brave.http.HttpTracing; +import org.apache.cxf.interceptor.OneWayInterceptor; import org.apache.cxf.message.Message; import org.apache.cxf.phase.PhaseInterceptor; public abstract class AbstractBraveClientInterceptor extends AbstractBraveClientProvider - implements PhaseInterceptor<Message> { + implements PhaseInterceptor<Message>, OneWayInterceptor<Message> { private String phase; diff --git a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java index 28eb3431f4..0363fd259f 100644 --- a/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java +++ b/integration/tracing/tracing-opentelemetry/src/main/java/org/apache/cxf/tracing/opentelemetry/AbstractOpenTelemetryClientInterceptor.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Set; +import org.apache.cxf.interceptor.OneWayInterceptor; import org.apache.cxf.message.Message; import org.apache.cxf.phase.PhaseInterceptor; @@ -29,7 +30,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; public abstract class AbstractOpenTelemetryClientInterceptor extends AbstractOpenTelemetryClientProvider - implements PhaseInterceptor<Message> { + implements PhaseInterceptor<Message>, OneWayInterceptor<Message> { private String phase; diff --git a/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java b/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java index e758b3e799..12fe25c417 100644 --- a/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java +++ b/integration/tracing/tracing-opentracing/src/main/java/org/apache/cxf/tracing/opentracing/AbstractOpenTracingClientInterceptor.java @@ -22,13 +22,14 @@ import java.util.Collection; import java.util.Collections; import java.util.Set; +import org.apache.cxf.interceptor.OneWayInterceptor; import org.apache.cxf.message.Message; import org.apache.cxf.phase.PhaseInterceptor; import io.opentracing.Tracer; public abstract class AbstractOpenTracingClientInterceptor extends AbstractOpenTracingClientProvider - implements PhaseInterceptor<Message> { + implements PhaseInterceptor<Message>, OneWayInterceptor<Message> { private String phase;