create abstract end point handler
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/8e2aad5b Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/8e2aad5b Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/8e2aad5b Branch: refs/heads/master Commit: 8e2aad5b018d1435d212bebe39662bd58e338d57 Parents: e44832a Author: Otavio Santana <otaviopolianasant...@gmail.com> Authored: Fri Dec 15 07:53:06 2017 -0300 Committer: Otavio Santana <otaviopolianasant...@gmail.com> Committed: Fri Dec 15 07:53:06 2017 -0300 ---------------------------------------------------------------------- .../core/mdb/AbstractEndpointHandler.java | 201 +++++++++++++++++++ .../openejb/core/mdb/EndpointHandler.java | 6 - .../openejb/core/mdb/PoolEndpointHandler.java | 6 - 3 files changed, 201 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tomee/blob/8e2aad5b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java new file mode 100644 index 0000000..3df2438 --- /dev/null +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/AbstractEndpointHandler.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.openejb.core.mdb; + +import org.apache.openejb.ApplicationException; +import org.apache.openejb.SystemException; +import org.apache.openejb.resource.activemq.jms2.DelegateMessage; +import org.apache.openejb.resource.activemq.jms2.JMS2; + +import javax.ejb.EJBException; +import javax.jms.Message; +import javax.resource.spi.ApplicationServerInternalException; +import javax.resource.spi.UnavailableException; +import java.lang.reflect.Method; +import java.util.Arrays; + +abstract class AbstractEndpointHandler { + + protected State state = State.NONE; + protected volatile Boolean isAmq; + + protected Object instance; + + //final + protected BaseMdbContainer container; + + + public abstract void beforeDelivery(final Method method) throws ApplicationServerInternalException; + + protected abstract void recreateInstance(final boolean exceptionAlreadyThrown) throws UnavailableException; + + public abstract void release(); + + + public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + + final String methodName = method.getName(); + final Class<?>[] parameterTypes = method.getParameterTypes(); + + if (method.getDeclaringClass() == Object.class) { + if ("toString".equals(methodName) && parameterTypes.length == 0) { + return toString(); + } else if ("equals".equals(methodName) && parameterTypes.length == 1) { + return equals(args[0]); + } else if ("hashCode".equals(methodName) && parameterTypes.length == 0) { + return hashCode(); + } else { + throw new UnsupportedOperationException("Unkown method: " + method); + } + } + + if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) { + beforeDelivery((Method) args[0]); + return null; + } else if ("afterDelivery".equals(methodName) && parameterTypes.length == 0) { + afterDelivery(); + return null; + } else if ("release".equals(methodName) && parameterTypes.length == 0) { + release(); + return null; + } else { + final Object value = deliverMessage(method, args); + return value; + } + } + + public Object deliverMessage(final Method method, final Object[] args) throws Throwable { + + boolean callBeforeAfter = false; + + // verify current state + switch (state) { + case NONE: + try { + beforeDelivery(method); + } catch (final ApplicationServerInternalException e) { + throw (EJBException) new EJBException().initCause(e.getCause()); + } + callBeforeAfter = true; + state = State.METHOD_CALLED; + break; + case BEFORE_CALLED: + state = State.METHOD_CALLED; + break; + case RELEASED: + throw new IllegalStateException("Message endpoint factory has been released"); + case METHOD_CALLED: + case SYSTEM_EXCEPTION: + throw new IllegalStateException("The last message delivery must be completed with an afterDeliver before another message can be delivered"); + } + + Throwable throwable = null; + Object value = null; + try { + // deliver the message + value = container.invoke(instance, method, null, wrapMessageForAmq5(args)); + } catch (final SystemException se) { + throwable = se.getRootCause() != null ? se.getRootCause() : se; + state = State.SYSTEM_EXCEPTION; + } catch (final ApplicationException ae) { + throwable = ae.getRootCause() != null ? ae.getRootCause() : ae; + } finally { + // if the adapter is not using before/after, we must call afterDelivery to clean up + if (callBeforeAfter) { + try { + afterDelivery(); + } catch (final ApplicationServerInternalException e) { + throwable = throwable == null ? e.getCause() : throwable; + } catch (final UnavailableException e) { + throwable = throwable == null ? e : throwable; + } + } + } + + if (throwable != null) { + throwable.printStackTrace(); + if (isValidException(method, throwable)) { + throw throwable; + } else { + throw new EJBException().initCause(throwable); + } + } + return value; + } + + public void afterDelivery() throws ApplicationServerInternalException, UnavailableException { + switch (state) { + case RELEASED: + throw new IllegalStateException("Message endpoint factory has been released"); + case NONE: + throw new IllegalStateException("afterDelivery may only be called if message delivery began with a beforeDelivery call"); + } + + + // call afterDelivery on the container + boolean exceptionThrown = false; + try { + container.afterDelivery(instance); + } catch (final SystemException se) { + exceptionThrown = true; + + final Throwable throwable = se.getRootCause() != null ? se.getRootCause() : se; + throwable.printStackTrace(); + throw new ApplicationServerInternalException(throwable); + } finally { + if (state == State.SYSTEM_EXCEPTION) { + recreateInstance(exceptionThrown); + } + // we are now in the default NONE state + state = State.NONE; + } + } + + // workaround for AMQ 5/JMS 2 support + private Object[] wrapMessageForAmq5(final Object[] args) { + if (args == null || args.length != 1 || DelegateMessage.class.isInstance(args[0])) { + return args; + } + + if (isAmq == null) { + synchronized (this) { + if (isAmq == null) { + isAmq = args[0].getClass().getName().startsWith("org.apache.activemq."); + } + } + } + if (isAmq) { + args[0] = JMS2.wrap(Message.class.cast(args[0])); + } + return args; + } + + private boolean isValidException(final Method method, final Throwable throwable) { + if (throwable instanceof RuntimeException || throwable instanceof Error) { + return true; + } + + final Class<?>[] exceptionTypes = method.getExceptionTypes(); + for (final Class<?> exceptionType : exceptionTypes) { + if (exceptionType.isInstance(throwable)) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/tomee/blob/8e2aad5b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java index 21ce0f4..026e94c 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/EndpointHandler.java @@ -55,10 +55,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint { public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { -// System.out.println("\n" + -// "***************************************\n" + -// "Endpoint invoked " + method + "\n" + -// "***************************************\n\n"); final String methodName = method.getName(); final Class<?>[] parameterTypes = method.getParameterTypes(); @@ -75,7 +71,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint { } } -// try { if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) { beforeDelivery((Method) args[0]); return null; @@ -89,7 +84,6 @@ public class EndpointHandler implements InvocationHandler, MessageEndpoint { final Object value = deliverMessage(method, args); return value; } -// } finally { logTx(); } } public void beforeDelivery(final Method method) throws ApplicationServerInternalException { http://git-wip-us.apache.org/repos/asf/tomee/blob/8e2aad5b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java ---------------------------------------------------------------------- diff --git a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java index cc1cc13..f507a35 100644 --- a/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java +++ b/container/openejb-core/src/main/java/org/apache/openejb/core/mdb/PoolEndpointHandler.java @@ -57,10 +57,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint { } public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { -// System.out.println("\n" + -// "***************************************\n" + -// "Endpoint invoked " + method + "\n" + -// "***************************************\n\n"); final String methodName = method.getName(); final Class<?>[] parameterTypes = method.getParameterTypes(); @@ -77,7 +73,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint { } } -// try { if ("beforeDelivery".equals(methodName) && Arrays.deepEquals(new Class[]{Method.class}, parameterTypes)) { beforeDelivery((Method) args[0]); return null; @@ -91,7 +86,6 @@ public class PoolEndpointHandler implements InvocationHandler, MessageEndpoint { final Object value = deliverMessage(method, args); return value; } -// } finally { logTx(); } } public void beforeDelivery(final Method method) throws ApplicationServerInternalException {