Enable async request-reply by enhancing the CamelInvocationHandler so that it
is serializable, so that camel-spring Proxies can be passed as callback objects
-------------------------------------------------------------------------------------------------------------------------------------------------------------
Key: CAMEL-2026
URL: https://issues.apache.org/activemq/browse/CAMEL-2026
Project: Apache Camel
Issue Type: Improvement
Components: camel-core, camel-spring
Affects Versions: 2.0.0
Reporter: Scott Clasen
With minor alteration, the CamelInvocationHandler could be made serializable,
and then camel-spring bean proxies could be passed as callback objects in
method calls on other camel-spring proxies...enabling async request-reply via
spring remoting.
I have achieved this by Wrapping the CamelInvocationHandler, subclassing the
CamelProxyFactoryBean, and by attaching a processor to routes that can recieve
BeanInvocations that have a callback object. Integrated into the codebase,
this could be much more straightforward, and not require a processor be
attached.
=================CamelInvocationHandlerWrapper
/**
* Serializable wrapper for Camel/Spring remoting proxies.
*/
public class CamelInvocationHandlerWrapper implements InvocationHandler,
Serializable {
/*
Findbugs will complain that inner dosent get set at deserialization time.
This is ok.
You need to have a CamelRemotingProcessor in your inbound camel route that
will reset the handler.
*/
private transient CamelInvocationHandler inner;
private String serviceUrl;
private static final long serialVersionUID = 7635312279175935612L;
/**
* Create a CamelInvocationHandlerWrapper.
*
* @param handler the handler to use in this VM.
* @param serviceUrl the serviceUrl to use to rebuild the
CamelInvocationHandler if we are serialized and used in a different VM.
*/
public CamelInvocationHandlerWrapper(CamelInvocationHandler handler, String
serviceUrl) {
this.inner = handler;
this.serviceUrl = serviceUrl;
}
/**
* {...@inheritdoc}
*/
public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
if (method.getDeclaringClass().equals(Object.class)) {
return method.invoke(this, args);
} else {
if (inner == null) {
throw new IllegalStateException("The inner
CamelInvocationHandler is null, perhaps there was no CamelRemotingProcessor on
your inbound route???");
} else {
return inner.invoke(proxy, method, args);
}
}
}
/**
* Package Private so the CamelRemotingProcessor can rewire when we are
passed remotely.
*
* @param context the current camel context.
* @throws Exception if we cant build an endpoint for the service url or
create a producer.
*/
void rebuildInvocationHandler(CamelContext context) throws Exception {
Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(context,
serviceUrl);
Producer producer = endpoint.createProducer();
producer.start();
inner = new CamelInvocationHandler(endpoint, producer, new
MethodInfoCache(endpoint.getCamelContext()));
}
void setServiceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
}
}
================Proxy Factory
/**
* ProxyFactory that wraps a camel proxy in a serializable form.
*/
public class CamelProxyWrapperFactoryBean extends CamelProxyFactoryBean {
/**
* Override the CamelProxyFactoryBean to return a different proxy that uses
a CamelInvocationHandlerWrapper.
*
* @return a Proxy backed by a CamelInvocationHandlerWrapper for the
specified interface.
* @throws Exception if we cant create the proxy.
*/
@Override
public Object getObject() throws Exception {
Object proxy = super.getObject();
CamelInvocationHandler handler = (CamelInvocationHandler)
Proxy.getInvocationHandler(proxy);
return Proxy.newProxyInstance(getObjectType().getClassLoader(), new
Class[]{getObjectType()}, new CamelInvocationHandlerWrapper(handler,
getServiceUrl()));
}
}
========processor that "rewires" the transient CamelInvocationHandler by using
the service url
public class CamelRemotingProcessor implements Processor {
/**
* Rebuild the CamelInvocationHandler if we were passed a Proxy that has a
CamelInvocationHandlerWrapper as its handler.
*
* @param invocation the BeanInvocation whose args we check for the Proxy.
* @param context the current Camel Context.
* @throws Exception if something blows up.
*/
public void rewireProxy(BeanInvocation invocation, CamelContext context)
throws Exception {
Object[] args = invocation.getArgs();
if (args != null) {
for (Object arg : args) {
if (Proxy.isProxyClass(arg.getClass())) {
InvocationHandler handler = Proxy.getInvocationHandler(arg);
if (handler instanceof CamelInvocationHandlerWrapper) {
CamelInvocationHandlerWrapper wrapper =
(CamelInvocationHandlerWrapper) handler;
wrapper.rebuildInvocationHandler(context);
}
}
}
}
}
/**
* {...@inheritdoc}
*/
public void process(Exchange exchange) throws Exception {
CamelContext context = exchange.getContext();
BeanInvocation invocation =
exchange.getIn().getBody(BeanInvocation.class);
rewireProxy(invocation, context);
}
}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.