This is an automated email from the ASF dual-hosted git repository. dkulp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push: new 6abad18 [CXF-7591, CXF-7710] More updates to the response context handling 6abad18 is described below commit 6abad1864187457984b0dff2622eafb4b5df5305 Author: Daniel Kulp <dk...@apache.org> AuthorDate: Fri Apr 27 12:31:09 2018 -0400 [CXF-7591, CXF-7710] More updates to the response context handling --- .../main/java/org/apache/cxf/endpoint/Client.java | 28 ++++ .../java/org/apache/cxf/endpoint/ClientImpl.java | 185 +++++++++++++-------- 2 files changed, 147 insertions(+), 66 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/endpoint/Client.java b/core/src/main/java/org/apache/cxf/endpoint/Client.java index 27ec1b3..c06b7f6 100644 --- a/core/src/main/java/org/apache/cxf/endpoint/Client.java +++ b/core/src/main/java/org/apache/cxf/endpoint/Client.java @@ -239,6 +239,34 @@ public interface Client extends InterceptorProvider, MessageObserver, ConduitSel * @return true if the request context is a thread local */ boolean isThreadLocalRequestContext(); + + + /** + * Wrappers the contexts in a way that allows the contexts + * to be cleared and released in an try-with-resources block + */ + interface Contexts extends AutoCloseable { + Map<String, Object> getRequestContext(); + Map<String, Object> getResponseContext(); + } + + default Contexts getContexts() { + return new Contexts() { + @Override + public void close() throws Exception { + getRequestContext().clear(); + getResponseContext().clear(); + } + @Override + public Map<String, Object> getRequestContext() { + return Client.this.getRequestContext(); + } + @Override + public Map<String, Object> getResponseContext() { + return Client.this.getResponseContext(); + } + }; + } Endpoint getEndpoint(); diff --git a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java index 6beecac..799c26e 100644 --- a/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java +++ b/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java @@ -96,11 +96,11 @@ public class ClientImpl protected Map<String, Object> currentRequestContext = new ConcurrentHashMap<String, Object>(8, 0.75f, 4); protected Thread latestContextThread; - protected Map<Thread, Map<String, Object>> requestContext - = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>()); + protected Map<Thread, EchoContext> requestContext + = Collections.synchronizedMap(new WeakHashMap<Thread, EchoContext>()); - protected Map<Thread, Map<String, Object>> responseContext - = Collections.synchronizedMap(new WeakHashMap<Thread, Map<String, Object>>()); + protected Map<Thread, ResponseContext> responseContext + = Collections.synchronizedMap(new WeakHashMap<Thread, ResponseContext>()); protected Executor executor; @@ -237,12 +237,33 @@ public class ClientImpl return getConduitSelector().getEndpoint(); } - + public void releaseThreadContexts() { + final Thread t = Thread.currentThread(); + requestContext.remove(t); + responseContext.remove(t); + } + + public Contexts getContexts() { + return new Contexts() { + @Override + public void close() throws Exception { + releaseThreadContexts(); + } + @Override + public Map<String, Object> getRequestContext() { + return ClientImpl.this.getRequestContext(); + } + @Override + public Map<String, Object> getResponseContext() { + return ClientImpl.this.getResponseContext(); + } + }; + } public Map<String, Object> getRequestContext() { if (isThreadLocalRequestContext()) { final Thread t = Thread.currentThread(); if (!requestContext.containsKey(t)) { - Map<String, Object> freshRequestContext = new EchoContext(currentRequestContext); + EchoContext freshRequestContext = new EchoContext(currentRequestContext, requestContext); requestContext.put(t, freshRequestContext); } latestContextThread = t; @@ -251,28 +272,30 @@ public class ClientImpl return currentRequestContext; } public Map<String, Object> getResponseContext() { - if (!responseContext.containsKey(Thread.currentThread())) { - final Thread t = Thread.currentThread(); - responseContext.put(t, new HashMap<String, Object>() { - private static final long serialVersionUID = 1L; - @Override - public void clear() { - super.clear(); - try { - for (Map.Entry<Thread, Map<String, Object>> ent : responseContext.entrySet()) { - if (ent.getValue() == this) { - responseContext.remove(ent.getKey()); - return; - } - } - } catch (Throwable t) { - //ignore - } - } - }); + final Thread t = Thread.currentThread(); + ResponseContext ret = responseContext.get(t); + if (ret == null) { + ret = new ResponseContext(); + responseContext.put(t, ret); } - return responseContext.get(Thread.currentThread()); - + return ret; + } + protected Map<String, Object> newResponseContext() { + final Thread t = Thread.currentThread(); + ResponseContext ret = new ResponseContext(); + responseContext.put(t, ret); + return ret; + } + protected Map<String, Object> reloadResponseContext(Map<String, Object> o) { + final Thread t = Thread.currentThread(); + ResponseContext ctx = responseContext.get(t); + if (ctx == null) { + ctx = new ResponseContext(o); + responseContext.put(t, ctx); + } else if (o != ctx) { + ctx.reload(o); + } + return ctx; } public boolean isThreadLocalRequestContext() { Object o = currentRequestContext.get(THREAD_LOCAL_REQUEST_CONTEXT); @@ -335,31 +358,12 @@ public class ClientImpl Object[] params, Exchange exchange) throws Exception { Map<String, Object> context = new HashMap<>(); - Map<String, Object> resp = new HashMap<>(); - Map<String, Object> req = new HashMap<>(getRequestContext()); - context.put(RESPONSE_CONTEXT, resp); - context.put(REQUEST_CONTEXT, req); - try { - return invoke(oi, params, context, exchange); - } finally { - if (responseContext != null) { - responseContext.put(Thread.currentThread(), resp); - } - } + return invoke(oi, params, context, exchange); } public Object[] invoke(BindingOperationInfo oi, Object[] params, Map<String, Object> context) throws Exception { - try { - return invoke(oi, params, context, (Exchange)null); - } finally { - if (context != null) { - Map<String, Object> resp = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT)); - if (resp != null && responseContext != null) { - responseContext.put(Thread.currentThread(), resp); - } - } - } + return invoke(oi, params, context, (Exchange)null); } public void invoke(ClientCallback callback, @@ -449,6 +453,7 @@ public class ClientImpl Exchange exchange) throws Exception { Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus); ClassLoaderHolder origLoader = null; + Map<String, Object> resContext = null; try { ClassLoader loader = bus.getExtension(ClassLoader.class); if (loader != null) { @@ -467,7 +472,6 @@ public class ClientImpl // Make sure INVOCATION CONTEXT, REQUEST_CONTEXT and RESPONSE_CONTEXT are present // on message Map<String, Object> reqContext = null; - Map<String, Object> resContext = null; if (context == null) { context = new HashMap<>(); } @@ -478,7 +482,7 @@ public class ClientImpl context.put(REQUEST_CONTEXT, reqContext); } if (resContext == null) { - resContext = new HashMap<>(); + resContext = newResponseContext(); context.put(RESPONSE_CONTEXT, resContext); } @@ -514,7 +518,7 @@ public class ClientImpl // handle the right response List<Object> resList = null; Message inMsg = message.getExchange().getInMessage(); - Map<String, Object> ctx = responseContext.get(Thread.currentThread()); + Map<String, Object> ctx = getResponseContext(); resList = CastUtils.cast(inMsg.getContent(List.class)); Object[] result = resList == null ? null : resList.toArray(); callback.handleResponse(ctx, result); @@ -541,6 +545,9 @@ public class ClientImpl } return processResult(message, exchange, oi, resContext); } finally { + if (callback == null) { + reloadResponseContext(resContext); + } if (origLoader != null) { origLoader.reset(); } @@ -639,7 +646,7 @@ public class ClientImpl resContext.putAll(inMsg); // remove the recursive reference if present resContext.remove(Message.INVOCATION_CONTEXT); - responseContext.put(Thread.currentThread(), resContext); + reloadResponseContext(resContext); } resList = CastUtils.cast(inMsg.getContent(List.class)); } @@ -814,9 +821,8 @@ public class ClientImpl Message.INVOCATION_CONTEXT)); resCtx = CastUtils.cast((Map<?, ?>) resCtx .get(RESPONSE_CONTEXT)); - if (resCtx != null) { - responseContext.put(Thread.currentThread(), resCtx); - } + resCtx = reloadResponseContext(resCtx); + // remove callback so that it won't be invoked twice callback = message.getExchange().remove(ClientCallback.class); if (callback != null) { @@ -843,15 +849,14 @@ public class ClientImpl .getOutMessage() .get(Message.INVOCATION_CONTEXT)); resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT)); - if (resCtx != null && responseContext != null) { - responseContext.put(Thread.currentThread(), resCtx); - } try { Object obj[] = processResult(message, message.getExchange(), null, resCtx); + resCtx = reloadResponseContext(resCtx); callback.handleResponse(resCtx, obj); } catch (Throwable ex) { + resCtx = reloadResponseContext(resCtx); callback.handleException(resCtx, ex); } } @@ -1052,28 +1057,40 @@ public class ClientImpl } - /* - * modification are echoed back to the shared map - */ public class EchoContext extends ConcurrentHashMap<String, Object> { private static final long serialVersionUID = 1L; - public EchoContext(Map<String, Object> sharedMap) { + + final Map<Thread, EchoContext> context; + public EchoContext(Map<String, Object> sharedMap, Map<Thread, EchoContext> ctx) { + super(8, 0.75f, 4); + if (sharedMap != null) { + super.putAll(sharedMap); + } + context = ctx; + } + + public EchoContext(Map<Thread, EchoContext> ctx) { super(8, 0.75f, 4); - putAll(sharedMap); + context = ctx; } public void reload() { + reload(context.get(latestContextThread)); + } + public void reload(Map<String, Object> content) { super.clear(); - super.putAll(requestContext.get(latestContextThread)); + if (content != null) { + putAll(content); + } } @Override public void clear() { super.clear(); try { - for (Map.Entry<Thread, Map<String, Object>> ent : requestContext.entrySet()) { + for (Map.Entry<Thread, EchoContext> ent : context.entrySet()) { if (ent.getValue() == this) { - requestContext.remove(ent.getKey()); + context.remove(ent.getKey()); return; } } @@ -1083,6 +1100,42 @@ public class ClientImpl } } + /** + * Class to handle the response contexts. The clear is overloaded to remove + * this context from the threadLocal caches in the ClientImpl + */ + class ResponseContext extends HashMap<String, Object> { + private static final long serialVersionUID = 1L; + + ResponseContext(Map<String, Object> origMap) { + super(origMap); + } + + ResponseContext() { + } + + public void reload(Map<String, Object> content) { + super.clear(); + if (content != null) { + putAll(content); + } + } + + @Override + public void clear() { + super.clear(); + try { + for (Map.Entry<Thread, ResponseContext> ent : responseContext.entrySet()) { + if (ent.getValue() == this) { + responseContext.remove(ent.getKey()); + return; + } + } + } catch (Throwable t) { + //ignore + } + } + } public void setExecutor(Executor executor) { if (!SynchronousExecutor.isA(executor)) { -- To stop receiving notification emails like this one, please contact dk...@apache.org.