Repository: aries-rsa Updated Branches: refs/heads/master aaafdf780 -> 1009ba59f
fastbin should throw a service exception for unknown methods when the client tries to call a method that the implementation does not have (e.g. incompatible class files) a ServiceException should be thrown Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/d779ff10 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/d779ff10 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/d779ff10 Branch: refs/heads/master Commit: d779ff10e12ec39e47996f294c6c658f2a4cd391 Parents: aaafdf7 Author: Johannes Utzig <jut...@apache.org> Authored: Tue Feb 21 10:02:37 2017 +0100 Committer: Johannes Utzig <jut...@apache.org> Committed: Tue Apr 11 13:04:49 2017 +0200 ---------------------------------------------------------------------- .../provider/fastbin/tcp/ServerInvokerImpl.java | 143 +++++++++---------- .../provider/fastbin/TransportFailureTest.java | 2 +- 2 files changed, 72 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java index 1dd58f9..d971140 100644 --- a/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java +++ b/provider/fastbin/src/main/java/org/apache/aries/rsa/provider/fastbin/tcp/ServerInvokerImpl.java @@ -246,80 +246,22 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { final ServiceFactoryHolder holder = holders.get(service); Runnable task = null; if(holder==null) { - LOGGER.warn("The requested service {"+service+"} is not available"); - task = new Runnable() { - public void run() { - - final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); - try { - baos.writeInt(0); // make space for the size field. - baos.writeVarLong(correlation); - } catch (IOException e) { // should not happen - LOGGER.error("Failed to write to buffer",e); - throw new RuntimeException(e); - } - - // Lets decode the remaining args on the target's executor - // to take cpu load off the - BlockingInvocationStrategy strategy = new BlockingInvocationStrategy(); - strategy.service(ObjectSerializationStrategy.INSTANCE, getClass().getClassLoader(), null, new ServiceException("The requested service {"+service+"} is not available"), bais, baos, new Runnable() { - - public void run() { - final Buffer command = baos.toBuffer(); - - // Update the size field. - BufferEditor editor = command.buffer().bigEndianEditor(); - editor.writeInt(command.length); - - queue().execute(new Runnable() { - public void run() { - transport.offer(command); - } - }); - } - }); - } - }; + String message = "The requested service {"+service+"} is not available"; + LOGGER.warn(message); + task = new SendTask(bais, correlation, transport, message); } final Object svc = holder==null ? null : holder.factory.get(); - if(holder!=null) - { - final MethodData methodData = holder.getMethodData(encoded_method); - - - task = new Runnable() { - public void run() { - - final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); - try { - baos.writeInt(0); // make space for the size field. - baos.writeVarLong(correlation); - } catch (IOException e) { // should not happen - LOGGER.error("Failed to write to buffer",e); - throw new RuntimeException(e); - } - - // Lets decode the remaining args on the target's executor - // to take cpu load off the - methodData.invocationStrategy.service(methodData.serializationStrategy, holder.loader, methodData.method, svc, bais, baos, new Runnable() { - public void run() { - holder.factory.unget(); - final Buffer command = baos.toBuffer(); - - // Update the size field. - BufferEditor editor = command.buffer().bigEndianEditor(); - editor.writeInt(command.length); - - queue().execute(new Runnable() { - public void run() { - transport.offer(command); - } - }); - } - }); - } - }; - + if(holder!=null) { + try { + final MethodData methodData = holder.getMethodData(encoded_method); + task = new SendTask(svc, bais, holder, correlation, methodData, transport); + } + catch (ReflectiveOperationException reflectionEx) { + final String methodName = encoded_method.utf8().toString(); + String message = "The requested method {"+methodName+"} is not available"; + LOGGER.warn(message); + task = new SendTask(bais, correlation, transport, message); + } } Executor executor; @@ -378,4 +320,61 @@ public class ServerInvokerImpl implements ServerInvoker, Dispatched { } } + private final class SendTask implements Runnable { + private Object svc; + private DataByteArrayInputStream bais; + private ServiceFactoryHolder holder; + private long correlation; + private MethodData methodData; + private Transport transport; + + + private SendTask(Object svc, DataByteArrayInputStream bais, ServiceFactoryHolder holder, long correlation, MethodData methodData, Transport transport) { + this.svc = svc; + this.bais = bais; + this.holder = holder; + this.correlation = correlation; + this.methodData = methodData; + this.transport = transport; + } + + private SendTask(DataByteArrayInputStream bais, long correlation, Transport transport, String errorMessage) { + this(new ServiceException(errorMessage), bais, null, correlation, new MethodData(new BlockingInvocationStrategy(), ObjectSerializationStrategy.INSTANCE, null),transport); + } + + public void run() { + + final DataByteArrayOutputStream baos = new DataByteArrayOutputStream(); + try { + baos.writeInt(0); // make space for the size field. + baos.writeVarLong(correlation); + } catch (IOException e) { // should not happen + LOGGER.error("Failed to write to buffer",e); + throw new RuntimeException(e); + } + + // Lets decode the remaining args on the target's executor + // to take cpu load off the + + ClassLoader loader = holder==null ? getClass().getClassLoader() : holder.loader; + methodData.invocationStrategy.service(methodData.serializationStrategy, loader, methodData.method, svc, bais, baos, new Runnable() { + public void run() { + if(holder!=null) + holder.factory.unget(); + final Buffer command = baos.toBuffer(); + + // Update the size field. + BufferEditor editor = command.buffer().bigEndianEditor(); + editor.writeInt(command.length); + + queue().execute(new Runnable() { + public void run() { + transport.offer(command); + } + }); + } + }); + } + } + } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/d779ff10/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java ---------------------------------------------------------------------- diff --git a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java index 3682d45..4b175fd 100644 --- a/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java +++ b/provider/fastbin/src/test/java/org/apache/aries/rsa/provider/fastbin/TransportFailureTest.java @@ -105,7 +105,7 @@ public class TransportFailureTest { Thread.sleep(SLEEP_TIME); // Big introspection call to access the transport channel and close it, simulating // a disconnect on the client side. - ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "val$transport"), "channel")).close(); + ((SocketChannel) get(get(get(get(get(callback, "val$helper"), "onComplete"), "this$1"), "transport"), "channel")).close(); } catch (Throwable e) { e.printStackTrace(); }