RE: ABR Communication Pattern
Well, customer is king. With all the advantages, customer says it is an overhead to maintain state info. So they want a way to send extra responses from inside the service implementation code :) My boss, already sent me some sample code on how to create an outgoing interceptor chain to do this. Lets see how things turn out. Regards Mayank -Original Message- From: Daniel Kulp [mailto:[EMAIL PROTECTED] Sent: Wednesday, January 16, 2008 20:13 To: cxf-user@incubator.apache.org Cc: Mayank Thakore Subject: Re: ABR Communication Pattern On Tuesday 15 January 2008, Mayank Thakore wrote: > Actually that is what I had tried initially. I added an interceptor at > the end and used a flag on the message to decide on reprocessing. But > it didn't work after 2 and half days of effort so I gave in to > modifying cxf code. Most likely, the JMSDestination would need to add the JMSMessage to the CXF message and your interceptor would re-dispatch that instead of the CXF message. That still requires changes to the JMSDestination though. > It would be great if some of the interceptors could "undo" the change > they did to the message. The problem is that in most cases, actions are not undoable. With HTTP, we "stream" the data directly to/from the wire. No byte[] things in between like we need to for JMS. Thus, once something is read, it's not undoable. Actually, this brings up another idea that would help performance You COULD take the message and feed it back into the chain, but call the doInterceptStartingAfter(...) method to make the chain start at an interceptor much later in the phase, like after all the objects are unmarshalled, method is determined, etc... > Then, we would be able to move the message up > and down certain portions of the chain. (It might make the chain > heavier or slow down processing, but we could always add an annotation > or configuration to put "undo" on/off.) > > Combined with the pause/resume functionality, this would open up some > interesting possibilities. Definitely, just needs quite a bit of work to flush out all the little issues. :-) Dan > > Regards > Mayank > > -Original Message- > From: Daniel Kulp [mailto:[EMAIL PROTECTED] > Sent: Wednesday, January 16, 2008 02:14 > To: cxf-user@incubator.apache.org > Cc: Mayank Thakore > Subject: Re: ABR Communication Pattern > > > Hmm... interesting solution.I like it. :-) > > One potential issue is the use of the ThreadLocal, but that's > definitely not an issue right now. The API's technically allow an > interceptor to "pause" the chain and then resume it on a separate > thread. (for example, to wait for a resource to become available) > Nothing does that right now and doing so is going to take a bunch of > testing to fix everything else that will break. :-) > > To "support" that, the variable would need to be stored as a property > on the Exchange. Then an interceptor would need to be added to the > "end" of the outgoing chains to check for the flag and, if present, > restart the message processing from the beginning.I guess that > would be the more "correct" way to do it, but also a LOT more complex. > For now, your solution is great. > > > Dan > > On Tuesday 15 January 2008, Mayank Thakore wrote: > > I LOVE OPEN SOURCE > > > > It worked :) > > > > just need to call following from service implementation when another > > response needs to be sent (as long as you keep calling this your > > method will get reinvoked to handle this incoming request; so you > > can send as many responses to each request as you want) > > > > JMSDestination.abrStatus.set(new Boolean(true)); > > > > > > > > modified code from JMSDestination: > > > >-- --- > > > > public static ThreadLocal abrStatus = new > > ThreadLocal() { @Override > > protected synchronized Boolean initialValue() { > > return new Boolean(false); > > } > > }; > > > > protected class JMSExecutor implements Runnable { > > javax.jms.Message message; > > > > JMSExecutor(javax.jms.Message m) { > > message = m; > > } > > > > public void run() { > > long abrCount = 0; > > do { > > getLogger().log(Level.INFO, "run the incoming > > message in the threadpool"); > > JMSDestination.abrStatus.set(new Boolean(false)); &g
Re: ABR Communication Pattern
On Tuesday 15 January 2008, Mayank Thakore wrote: > Actually that is what I had tried initially. I added an interceptor at > the end and used a flag on the message to decide on reprocessing. But > it didn't work after 2 and half days of effort so I gave in to > modifying cxf code. Most likely, the JMSDestination would need to add the JMSMessage to the CXF message and your interceptor would re-dispatch that instead of the CXF message. That still requires changes to the JMSDestination though. > It would be great if some of the interceptors could "undo" the change > they did to the message. The problem is that in most cases, actions are not undoable. With HTTP, we "stream" the data directly to/from the wire. No byte[] things in between like we need to for JMS. Thus, once something is read, it's not undoable. Actually, this brings up another idea that would help performance You COULD take the message and feed it back into the chain, but call the doInterceptStartingAfter(...) method to make the chain start at an interceptor much later in the phase, like after all the objects are unmarshalled, method is determined, etc... > Then, we would be able to move the message up > and down certain portions of the chain. (It might make the chain > heavier or slow down processing, but we could always add an annotation > or configuration to put "undo" on/off.) > > Combined with the pause/resume functionality, this would open up some > interesting possibilities. Definitely, just needs quite a bit of work to flush out all the little issues. :-) Dan > > Regards > Mayank > > -Original Message- > From: Daniel Kulp [mailto:[EMAIL PROTECTED] > Sent: Wednesday, January 16, 2008 02:14 > To: cxf-user@incubator.apache.org > Cc: Mayank Thakore > Subject: Re: ABR Communication Pattern > > > Hmm... interesting solution.I like it. :-) > > One potential issue is the use of the ThreadLocal, but that's > definitely not an issue right now. The API's technically allow an > interceptor to "pause" the chain and then resume it on a separate > thread. (for example, to wait for a resource to become available) > Nothing does that right now and doing so is going to take a bunch of > testing to fix everything else that will break. :-) > > To "support" that, the variable would need to be stored as a property > on the Exchange. Then an interceptor would need to be added to the > "end" of the outgoing chains to check for the flag and, if present, > restart the message processing from the beginning.I guess that > would be the more "correct" way to do it, but also a LOT more complex. > For now, your solution is great. > > > Dan > > On Tuesday 15 January 2008, Mayank Thakore wrote: > > I LOVE OPEN SOURCE > > > > It worked :) > > > > just need to call following from service implementation when another > > response needs to be sent (as long as you keep calling this your > > method will get reinvoked to handle this incoming request; so you > > can send as many responses to each request as you want) > > > > JMSDestination.abrStatus.set(new Boolean(true)); > > > > > > > > modified code from JMSDestination: > > > >-- --- > > > > public static ThreadLocal abrStatus = new > > ThreadLocal() { @Override > > protected synchronized Boolean initialValue() { > > return new Boolean(false); > > } > > }; > > > > protected class JMSExecutor implements Runnable { > > javax.jms.Message message; > > > > JMSExecutor(javax.jms.Message m) { > > message = m; > > } > > > > public void run() { > > long abrCount = 0; > > do { > > getLogger().log(Level.INFO, "run the incoming > > message in the threadpool"); > > JMSDestination.abrStatus.set(new Boolean(false)); > > getLogger().log(Level.INFO, "abr count for > > message("+Thread.currentThread()+"): "+abrCount); > > abrCount++; > > try { > > incoming(message); > > } catch (IOException ex) { > > // TODO: Decide what to do if we receive the > > exception. getLogger().log(Level.WARNING, "Failed to process > > incoming message : ", ex); > > break; > > } &g
RE: ABR Communication Pattern
Actually that is what I had tried initially. I added an interceptor at the end and used a flag on the message to decide on reprocessing. But it didn't work after 2 and half days of effort so I gave in to modifying cxf code. It would be great if some of the interceptors could "undo" the change they did to the message. Then, we would be able to move the message up and down certain portions of the chain. (It might make the chain heavier or slow down processing, but we could always add an annotation or configuration to put "undo" on/off.) Combined with the pause/resume functionality, this would open up some interesting possibilities. Regards Mayank -Original Message- From: Daniel Kulp [mailto:[EMAIL PROTECTED] Sent: Wednesday, January 16, 2008 02:14 To: cxf-user@incubator.apache.org Cc: Mayank Thakore Subject: Re: ABR Communication Pattern Hmm... interesting solution.I like it. :-) One potential issue is the use of the ThreadLocal, but that's definitely not an issue right now. The API's technically allow an interceptor to "pause" the chain and then resume it on a separate thread. (for example, to wait for a resource to become available) Nothing does that right now and doing so is going to take a bunch of testing to fix everything else that will break. :-) To "support" that, the variable would need to be stored as a property on the Exchange. Then an interceptor would need to be added to the "end" of the outgoing chains to check for the flag and, if present, restart the message processing from the beginning.I guess that would be the more "correct" way to do it, but also a LOT more complex. For now, your solution is great. Dan On Tuesday 15 January 2008, Mayank Thakore wrote: > I LOVE OPEN SOURCE > > It worked :) > > just need to call following from service implementation when another > response needs to be sent (as long as you keep calling this your > method will get reinvoked to handle this incoming request; so you can > send as many responses to each request as you want) > > JMSDestination.abrStatus.set(new Boolean(true)); > > > > modified code from JMSDestination: > -- >--- > > public static ThreadLocal abrStatus = new > ThreadLocal() { @Override > protected synchronized Boolean initialValue() { > return new Boolean(false); > } > }; > > protected class JMSExecutor implements Runnable { > javax.jms.Message message; > > JMSExecutor(javax.jms.Message m) { > message = m; > } > > public void run() { > long abrCount = 0; > do { > getLogger().log(Level.INFO, "run the incoming message > in the threadpool"); > JMSDestination.abrStatus.set(new Boolean(false)); > getLogger().log(Level.INFO, "abr count for > message("+Thread.currentThread()+"): "+abrCount); > abrCount++; > try { > incoming(message); > } catch (IOException ex) { > // TODO: Decide what to do if we receive the > exception. getLogger().log(Level.WARNING, "Failed to process incoming > message : ", ex); > break; > } > } while(JMSDestination.abrStatus.get().booleanValue()); > } > > } > > > -- >--- > > Regards > Mayank > > On 1/15/08, Mayank Thakore <[EMAIL PROTECTED]> wrote: > > Well, it didn't work. The interceptors mutate the message and won't > > process the mutated one. > > > > So I am going to try changing the JMSDestination.incoming code to > > resend the message as many times as required. > > > > Any thoughts? > > > > Regards > > Mayank > > > > -Original Message- > > From: Mayank Thakore [mailto:[EMAIL PROTECTED] > > Sent: Sunday, January 13, 2008 23:09 > > To: cxf-user@incubator.apache.org > > Subject: ABR Communication Pattern > > > > Hi, > > > > I was trying to achieve Asynchronous Batch Response (ABR) pattern > > with CXF for JMS transport. ABR means user can send multiple > > responses to a single request. > > > > Please read this and let me know what you think. > > > > Hazard info: I haven't finished the client for this yet, so don't > > know wheather it is working. It does print the cxf logs correctly. > > >
Re: ABR Communication Pattern
Hmm... interesting solution.I like it. :-) One potential issue is the use of the ThreadLocal, but that's definitely not an issue right now. The API's technically allow an interceptor to "pause" the chain and then resume it on a separate thread. (for example, to wait for a resource to become available) Nothing does that right now and doing so is going to take a bunch of testing to fix everything else that will break. :-) To "support" that, the variable would need to be stored as a property on the Exchange. Then an interceptor would need to be added to the "end" of the outgoing chains to check for the flag and, if present, restart the message processing from the beginning.I guess that would be the more "correct" way to do it, but also a LOT more complex. For now, your solution is great. Dan On Tuesday 15 January 2008, Mayank Thakore wrote: > I LOVE OPEN SOURCE > > It worked :) > > just need to call following from service implementation when another > response needs to be sent (as long as you keep calling this your > method will get reinvoked to handle this incoming request; so you can > send as many responses to each request as you want) > > JMSDestination.abrStatus.set(new Boolean(true)); > > > > modified code from JMSDestination: > -- >--- > > public static ThreadLocal abrStatus = new > ThreadLocal() { @Override > protected synchronized Boolean initialValue() { > return new Boolean(false); > } > }; > > protected class JMSExecutor implements Runnable { > javax.jms.Message message; > > JMSExecutor(javax.jms.Message m) { > message = m; > } > > public void run() { > long abrCount = 0; > do { > getLogger().log(Level.INFO, "run the incoming message > in the threadpool"); > JMSDestination.abrStatus.set(new Boolean(false)); > getLogger().log(Level.INFO, "abr count for > message("+Thread.currentThread()+"): "+abrCount); > abrCount++; > try { > incoming(message); > } catch (IOException ex) { > // TODO: Decide what to do if we receive the > exception. getLogger().log(Level.WARNING, "Failed to process incoming > message : ", ex); > break; > } > } while(JMSDestination.abrStatus.get().booleanValue()); > } > > } > > > -- >--- > > Regards > Mayank > > On 1/15/08, Mayank Thakore <[EMAIL PROTECTED]> wrote: > > Well, it didn't work. The interceptors mutate the message and won't > > process the mutated one. > > > > So I am going to try changing the JMSDestination.incoming code to > > resend the message as many times as required. > > > > Any thoughts? > > > > Regards > > Mayank > > > > -Original Message- > > From: Mayank Thakore [mailto:[EMAIL PROTECTED] > > Sent: Sunday, January 13, 2008 23:09 > > To: cxf-user@incubator.apache.org > > Subject: ABR Communication Pattern > > > > Hi, > > > > I was trying to achieve Asynchronous Batch Response (ABR) pattern > > with CXF for JMS transport. ABR means user can send multiple > > responses to a single request. > > > > Please read this and let me know what you think. > > > > Hazard info: I haven't finished the client for this yet, so don't > > know wheather it is working. It does print the cxf logs correctly. > > > > Below is my interceptor code. It attaches to the end of the > > interceptor chain and executes the previous two interceptors which > > invoke the service implementation and send the out message > > respectively. > > == > > > > package ws.v1.tmf854; > > > > import java.util.ListIterator; > > > > import org.apache.cxf.interceptor.Fault; > > import org.apache.cxf.interceptor.Interceptor; > > import org.apache.cxf.message.Message; > > import org.apache.cxf.phase.AbstractPhaseInterceptor; > > import org.apache.cxf.phase.Phase; > > > > public class ABRInterceptor extends > > AbstractPhaseInterceptor { > > > >public ABRInterceptor() { > >super(Phase.POST_INVOKE); > >getAfter().add( &
Re: ABR Communication Pattern
I LOVE OPEN SOURCE It worked :) just need to call following from service implementation when another response needs to be sent (as long as you keep calling this your method will get reinvoked to handle this incoming request; so you can send as many responses to each request as you want) JMSDestination.abrStatus.set(new Boolean(true)); modified code from JMSDestination: - public static ThreadLocal abrStatus = new ThreadLocal() { @Override protected synchronized Boolean initialValue() { return new Boolean(false); } }; protected class JMSExecutor implements Runnable { javax.jms.Message message; JMSExecutor(javax.jms.Message m) { message = m; } public void run() { long abrCount = 0; do { getLogger().log(Level.INFO, "run the incoming message in the threadpool"); JMSDestination.abrStatus.set(new Boolean(false)); getLogger().log(Level.INFO, "abr count for message("+Thread.currentThread()+"): "+abrCount); abrCount++; try { incoming(message); } catch (IOException ex) { // TODO: Decide what to do if we receive the exception. getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex); break; } } while(JMSDestination.abrStatus.get().booleanValue()); } } - Regards Mayank On 1/15/08, Mayank Thakore <[EMAIL PROTECTED]> wrote: > Well, it didn't work. The interceptors mutate the message and won't process > the mutated one. > > So I am going to try changing the JMSDestination.incoming code to resend the > message as many times as required. > > Any thoughts? > > Regards > Mayank > > -Original Message- > From: Mayank Thakore [mailto:[EMAIL PROTECTED] > Sent: Sunday, January 13, 2008 23:09 > To: cxf-user@incubator.apache.org > Subject: ABR Communication Pattern > > Hi, > > I was trying to achieve Asynchronous Batch Response (ABR) pattern with > CXF for JMS transport. ABR means user can send multiple responses to a > single request. > > Please read this and let me know what you think. > > Hazard info: I haven't finished the client for this yet, so don't know > wheather it is working. It does print the cxf logs correctly. > > Below is my interceptor code. It attaches to the end of the > interceptor chain and executes the previous two interceptors which > invoke the service implementation and send the out message > respectively. > == > > package ws.v1.tmf854; > > import java.util.ListIterator; > > import org.apache.cxf.interceptor.Fault; > import org.apache.cxf.interceptor.Interceptor; > import org.apache.cxf.message.Message; > import org.apache.cxf.phase.AbstractPhaseInterceptor; > import org.apache.cxf.phase.Phase; > > public class ABRInterceptor extends AbstractPhaseInterceptor { > >public ABRInterceptor() { >super(Phase.POST_INVOKE); >getAfter().add( > > org.apache.cxf.interceptor.OutgoingChainInterceptor.class >.getName()); >} > >@Override >public void handleMessage(Message message) throws Fault { >System.out.println("ABRInterceptor invoked"); >if (ABRSession.isEnabled()) { >executeReRuns(message); >} >} > >private void executeReRuns(Message message) { >ListIterator> iterator = > prepareReRunIterator(message); >while (ABRSession.isEnabled()) { >ABRSession.disable(); >for (int i = 0; i < 2; i++) { >iterator.previous(); >} >for (int i = 0; i < 2; i++) { >Interceptor currentInterceptor = > iterator.next(); >currentInterceptor.handleMessage(message); >} >} >} > >private ListIterator> > prepareReRunIterator( >Message message) { >ListIterator> iterator = > message >.getInterceptorChain().getIterator(); >while (iterator.hasNext(
RE: ABR Communication Pattern
Well, it didn't work. The interceptors mutate the message and won't process the mutated one. So I am going to try changing the JMSDestination.incoming code to resend the message as many times as required. Any thoughts? Regards Mayank -Original Message- From: Mayank Thakore [mailto:[EMAIL PROTECTED] Sent: Sunday, January 13, 2008 23:09 To: cxf-user@incubator.apache.org Subject: ABR Communication Pattern Hi, I was trying to achieve Asynchronous Batch Response (ABR) pattern with CXF for JMS transport. ABR means user can send multiple responses to a single request. Please read this and let me know what you think. Hazard info: I haven't finished the client for this yet, so don't know wheather it is working. It does print the cxf logs correctly. Below is my interceptor code. It attaches to the end of the interceptor chain and executes the previous two interceptors which invoke the service implementation and send the out message respectively. == package ws.v1.tmf854; import java.util.ListIterator; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.message.Message; import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; public class ABRInterceptor extends AbstractPhaseInterceptor { public ABRInterceptor() { super(Phase.POST_INVOKE); getAfter().add( org.apache.cxf.interceptor.OutgoingChainInterceptor.class .getName()); } @Override public void handleMessage(Message message) throws Fault { System.out.println("ABRInterceptor invoked"); if (ABRSession.isEnabled()) { executeReRuns(message); } } private void executeReRuns(Message message) { ListIterator> iterator = prepareReRunIterator(message); while (ABRSession.isEnabled()) { ABRSession.disable(); for (int i = 0; i < 2; i++) { iterator.previous(); } for (int i = 0; i < 2; i++) { Interceptor currentInterceptor = iterator.next(); currentInterceptor.handleMessage(message); } } } private ListIterator> prepareReRunIterator( Message message) { ListIterator> iterator = message .getInterceptorChain().getIterator(); while (iterator.hasNext()) { iterator.next(); } iterator.previous(); return iterator; } } == Below is the session control device. It uses a thread local variable to remember if ABR session is enabled or disabled. == package ws.v1.tmf854; public class ABRSession { private static ThreadLocal status = new ThreadLocal() { @Override protected synchronized Boolean initialValue() { return new Boolean(false); } }; public static synchronized boolean isEnabled() { return status.get().booleanValue(); } public static synchronized void disable() { System.out.println("Disabling ABRSession"); status.set(new Boolean(false)); } public static synchronized void enable() { System.out.println("Enabling ABRSession"); status.set(new Boolean(true)); } } == So, if user wants ABR mode, they just need to invoke ABRSession.enable() inside the service implementation. Here is the server main for completion: == package ws.v1.tmf854; import javax.xml.ws.Endpoint; import org.apache.cxf.jaxws.EndpointImpl; /** * This class was generated by Apache CXF (incubator) 2.0.3-incubator Sat Jan 12 * 11:10:39 IST 2008 Generated source version: 2.0.3-incubator * */ public class AlarmRetrieval_AlarmRetrievalJms_Server { protected AlarmRetrieval_AlarmRetrievalJms_Server() throws Exception { System.out.println("Starting Server"); Object implementor = new AlarmRetrievalImpl(); String address = "jms://"; EndpointImpl ep = (EndpointImpl) Endpoint.publish(address, implementor); ep.getServer().getEndpoint().getInInterceptors().add(
ABR Communication Pattern
Hi, I was trying to achieve Asynchronous Batch Response (ABR) pattern with CXF for JMS transport. ABR means user can send multiple responses to a single request. Please read this and let me know what you think. Hazard info: I haven't finished the client for this yet, so don't know wheather it is working. It does print the cxf logs correctly. Below is my interceptor code. It attaches to the end of the interceptor chain and executes the previous two interceptors which invoke the service implementation and send the out message respectively. == package ws.v1.tmf854; import java.util.ListIterator; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.message.Message; import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; public class ABRInterceptor extends AbstractPhaseInterceptor { public ABRInterceptor() { super(Phase.POST_INVOKE); getAfter().add( org.apache.cxf.interceptor.OutgoingChainInterceptor.class .getName()); } @Override public void handleMessage(Message message) throws Fault { System.out.println("ABRInterceptor invoked"); if (ABRSession.isEnabled()) { executeReRuns(message); } } private void executeReRuns(Message message) { ListIterator> iterator = prepareReRunIterator(message); while (ABRSession.isEnabled()) { ABRSession.disable(); for (int i = 0; i < 2; i++) { iterator.previous(); } for (int i = 0; i < 2; i++) { Interceptor currentInterceptor = iterator.next(); currentInterceptor.handleMessage(message); } } } private ListIterator> prepareReRunIterator( Message message) { ListIterator> iterator = message .getInterceptorChain().getIterator(); while (iterator.hasNext()) { iterator.next(); } iterator.previous(); return iterator; } } == Below is the session control device. It uses a thread local variable to remember if ABR session is enabled or disabled. == package ws.v1.tmf854; public class ABRSession { private static ThreadLocal status = new ThreadLocal() { @Override protected synchronized Boolean initialValue() { return new Boolean(false); } }; public static synchronized boolean isEnabled() { return status.get().booleanValue(); } public static synchronized void disable() { System.out.println("Disabling ABRSession"); status.set(new Boolean(false)); } public static synchronized void enable() { System.out.println("Enabling ABRSession"); status.set(new Boolean(true)); } } == So, if user wants ABR mode, they just need to invoke ABRSession.enable() inside the service implementation. Here is the server main for completion: == package ws.v1.tmf854; import javax.xml.ws.Endpoint; import org.apache.cxf.jaxws.EndpointImpl; /** * This class was generated by Apache CXF (incubator) 2.0.3-incubator Sat Jan 12 * 11:10:39 IST 2008 Generated source version: 2.0.3-incubator * */ public class AlarmRetrieval_AlarmRetrievalJms_Server { protected AlarmRetrieval_AlarmRetrievalJms_Server() throws Exception { System.out.println("Starting Server"); Object implementor = new AlarmRetrievalImpl(); String address = "jms://"; EndpointImpl ep = (EndpointImpl) Endpoint.publish(address, implementor); ep.getServer().getEndpoint().getInInterceptors().add( new ABRInterceptor()); } public static void main(String args[]) throws Exception { new AlarmRetrieval_AlarmRetrievalJms_Server(); System.out.println("Server ready..."); } } = So, what do you think? Thanks for any and all comments. Feel free to be critical. Mayank