Re: ABR Communication Pattern

2008-01-16 Thread Daniel Kulp
On Tuesday 15 January 2008, Mayank Thakore wrote:
 Actually that is what I had tried initially. I added an interceptor at
 the end and used a flag on the message to decide on reprocessing. But
 it didn't work after 2 and half days of effort so I gave in to
 modifying cxf code.

Most likely, the JMSDestination would need to add the JMSMessage to the 
CXF message and your interceptor would re-dispatch that instead of the 
CXF message.   That still requires changes to the JMSDestination though.

 It would be great if some of the interceptors could undo the change
 they did to the message. 

The problem is that in most cases, actions are not undoable.   With HTTP, 
we stream the data directly to/from the wire.   No byte[] things in 
between like we need to for JMS.   Thus, once something is read, it's 
not undoable.  

Actually, this brings up another idea that would help performance   
You COULD take the message and feed it back into the chain, but call the 
doInterceptStartingAfter(...) method to make the chain start at an 
interceptor much later in the phase, like after all the objects are 
unmarshalled, method is determined, etc...  

 Then, we would be able to move the message up 
 and down certain portions of the chain. (It might make the chain
 heavier or slow down processing, but we could always add an annotation
 or configuration to put undo on/off.)

 Combined with the pause/resume functionality, this would open up some
 interesting possibilities.

Definitely, just needs quite a bit of work to flush out all the little 
issues.   :-)

Dan



 Regards
 Mayank

 -Original Message-
 From: Daniel Kulp [mailto:[EMAIL PROTECTED]
 Sent: Wednesday, January 16, 2008 02:14
 To: cxf-user@incubator.apache.org
 Cc: Mayank Thakore
 Subject: Re: ABR Communication Pattern


 Hmm... interesting solution.I like it.  :-)

 One potential issue is the use of the ThreadLocal, but that's
 definitely not an issue right now.   The API's technically allow an
 interceptor to pause the chain and then resume it on a separate
 thread.  (for example, to wait for a resource to become available)  
 Nothing does that right now and doing so is going to take a bunch of
 testing to fix everything else that will break.  :-)

 To support that, the variable would need to be stored as a property
 on the Exchange.   Then an interceptor would need to be added to the
 end of the outgoing chains to check for the flag and, if present,
 restart the message processing from the beginning.I guess that
 would be the more correct way to do it, but also a LOT more complex.
   For now, your solution is great.


 Dan

 On Tuesday 15 January 2008, Mayank Thakore wrote:
  I LOVE OPEN SOURCE
 
  It worked :)
 
  just need to call following from service implementation when another
  response needs to be sent (as long as you keep calling this your
  method will get reinvoked to handle this incoming request; so you
  can send as many responses to each request as you want)
 
JMSDestination.abrStatus.set(new Boolean(true));
 
 
 
  modified code from JMSDestination:
  
 -- ---
 
  public static ThreadLocalBoolean abrStatus = new
  ThreadLocalBoolean() { @Override
  protected synchronized Boolean initialValue() {
  return new Boolean(false);
  }
  };
 
  protected class JMSExecutor implements Runnable {
  javax.jms.Message message;
 
  JMSExecutor(javax.jms.Message m) {
  message = m;
  }
 
  public void run() {
  long abrCount = 0;
  do {
  getLogger().log(Level.INFO, run the incoming
  message in the threadpool);
  JMSDestination.abrStatus.set(new Boolean(false));
  getLogger().log(Level.INFO, abr count for
  message(+Thread.currentThread()+): +abrCount);
  abrCount++;
  try {
  incoming(message);
  } catch (IOException ex) {
  // TODO: Decide what to do if we receive the
  exception. getLogger().log(Level.WARNING, Failed to process
  incoming message : , ex);
  break;
  }
  } while(JMSDestination.abrStatus.get().booleanValue());
  }
 
  }
 
 
  
 -- ---
 
  Regards
  Mayank
 
  On 1/15/08, Mayank Thakore [EMAIL PROTECTED] wrote:
   Well, it didn't work. The interceptors mutate the message and
   won't process the mutated one.
  
   So I am going to try changing the JMSDestination.incoming code to
   resend the message as many times as required.
  
   Any thoughts?
  
   Regards
   Mayank
  
   -Original Message-
   From: Mayank Thakore [mailto:[EMAIL PROTECTED]
   Sent: Sunday, January 13, 2008 23:09
   To: cxf-user@incubator.apache.org
   Subject: ABR

RE: ABR Communication Pattern

2008-01-16 Thread Mayank Thakore
Well, customer is king. With all the advantages, customer says it is an
overhead to maintain state info. So they want a way to send extra responses
from inside the service implementation code :)

My boss, already sent me some sample code on how to create an outgoing
interceptor chain to do this. Lets see how things turn out.

Regards
Mayank

-Original Message-
From: Daniel Kulp [mailto:[EMAIL PROTECTED] 
Sent: Wednesday, January 16, 2008 20:13
To: cxf-user@incubator.apache.org
Cc: Mayank Thakore
Subject: Re: ABR Communication Pattern

On Tuesday 15 January 2008, Mayank Thakore wrote:
 Actually that is what I had tried initially. I added an interceptor at
 the end and used a flag on the message to decide on reprocessing. But
 it didn't work after 2 and half days of effort so I gave in to
 modifying cxf code.

Most likely, the JMSDestination would need to add the JMSMessage to the 
CXF message and your interceptor would re-dispatch that instead of the 
CXF message.   That still requires changes to the JMSDestination though.

 It would be great if some of the interceptors could undo the change
 they did to the message. 

The problem is that in most cases, actions are not undoable.   With HTTP, 
we stream the data directly to/from the wire.   No byte[] things in 
between like we need to for JMS.   Thus, once something is read, it's 
not undoable.  

Actually, this brings up another idea that would help performance   
You COULD take the message and feed it back into the chain, but call the 
doInterceptStartingAfter(...) method to make the chain start at an 
interceptor much later in the phase, like after all the objects are 
unmarshalled, method is determined, etc...  

 Then, we would be able to move the message up 
 and down certain portions of the chain. (It might make the chain
 heavier or slow down processing, but we could always add an annotation
 or configuration to put undo on/off.)

 Combined with the pause/resume functionality, this would open up some
 interesting possibilities.

Definitely, just needs quite a bit of work to flush out all the little 
issues.   :-)

Dan



 Regards
 Mayank

 -Original Message-
 From: Daniel Kulp [mailto:[EMAIL PROTECTED]
 Sent: Wednesday, January 16, 2008 02:14
 To: cxf-user@incubator.apache.org
 Cc: Mayank Thakore
 Subject: Re: ABR Communication Pattern


 Hmm... interesting solution.I like it.  :-)

 One potential issue is the use of the ThreadLocal, but that's
 definitely not an issue right now.   The API's technically allow an
 interceptor to pause the chain and then resume it on a separate
 thread.  (for example, to wait for a resource to become available)  
 Nothing does that right now and doing so is going to take a bunch of
 testing to fix everything else that will break.  :-)

 To support that, the variable would need to be stored as a property
 on the Exchange.   Then an interceptor would need to be added to the
 end of the outgoing chains to check for the flag and, if present,
 restart the message processing from the beginning.I guess that
 would be the more correct way to do it, but also a LOT more complex.
   For now, your solution is great.


 Dan

 On Tuesday 15 January 2008, Mayank Thakore wrote:
  I LOVE OPEN SOURCE
 
  It worked :)
 
  just need to call following from service implementation when another
  response needs to be sent (as long as you keep calling this your
  method will get reinvoked to handle this incoming request; so you
  can send as many responses to each request as you want)
 
JMSDestination.abrStatus.set(new Boolean(true));
 
 
 
  modified code from JMSDestination:
  
 -- ---
 
  public static ThreadLocalBoolean abrStatus = new
  ThreadLocalBoolean() { @Override
  protected synchronized Boolean initialValue() {
  return new Boolean(false);
  }
  };
 
  protected class JMSExecutor implements Runnable {
  javax.jms.Message message;
 
  JMSExecutor(javax.jms.Message m) {
  message = m;
  }
 
  public void run() {
  long abrCount = 0;
  do {
  getLogger().log(Level.INFO, run the incoming
  message in the threadpool);
  JMSDestination.abrStatus.set(new Boolean(false));
  getLogger().log(Level.INFO, abr count for
  message(+Thread.currentThread()+): +abrCount);
  abrCount++;
  try {
  incoming(message);
  } catch (IOException ex) {
  // TODO: Decide what to do if we receive the
  exception. getLogger().log(Level.WARNING, Failed to process
  incoming message : , ex);
  break;
  }
  } while(JMSDestination.abrStatus.get().booleanValue

Re: ABR Communication Pattern

2008-01-15 Thread Mayank Thakore
I LOVE OPEN SOURCE

It worked :)

just need to call following from service implementation when another
response needs to be sent (as long as you keep calling this your
method will get reinvoked to handle this incoming request; so you can
send as many responses to each request as you want)

  JMSDestination.abrStatus.set(new Boolean(true));



modified code from JMSDestination:
-

public static ThreadLocalBoolean abrStatus = new ThreadLocalBoolean() {
@Override
protected synchronized Boolean initialValue() {
return new Boolean(false);
}
};

protected class JMSExecutor implements Runnable {
javax.jms.Message message;

JMSExecutor(javax.jms.Message m) {
message = m;
}

public void run() {
long abrCount = 0;
do {
getLogger().log(Level.INFO, run the incoming message
in the threadpool);
JMSDestination.abrStatus.set(new Boolean(false));
getLogger().log(Level.INFO, abr count for
message(+Thread.currentThread()+): +abrCount);
abrCount++;
try {
incoming(message);
} catch (IOException ex) {
// TODO: Decide what to do if we receive the exception.
getLogger().log(Level.WARNING, Failed to process
incoming message : , ex);
break;
}
} while(JMSDestination.abrStatus.get().booleanValue());
}

}


-

Regards
Mayank


On 1/15/08, Mayank Thakore [EMAIL PROTECTED] wrote:
 Well, it didn't work. The interceptors mutate the message and won't process
 the mutated one.

 So I am going to try changing the JMSDestination.incoming code to resend the
 message as many times as required.

 Any thoughts?

 Regards
 Mayank

 -Original Message-
 From: Mayank Thakore [mailto:[EMAIL PROTECTED]
 Sent: Sunday, January 13, 2008 23:09
 To: cxf-user@incubator.apache.org
 Subject: ABR Communication Pattern

 Hi,

 I was trying to achieve Asynchronous Batch Response (ABR) pattern with
 CXF for JMS transport. ABR means user can send multiple responses to a
 single request.

 Please read this and let me know what you think.

 Hazard info: I haven't finished the client for this yet, so don't know
 wheather it is working. It does print the cxf logs correctly.

 Below is my interceptor code. It attaches to the end of the
 interceptor chain and executes the previous two interceptors which
 invoke the service implementation and send the out message
 respectively.
 ==

 package ws.v1.tmf854;

 import java.util.ListIterator;

 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.Interceptor;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;

 public class ABRInterceptor extends AbstractPhaseInterceptorMessage {

public ABRInterceptor() {
super(Phase.POST_INVOKE);
getAfter().add(

 org.apache.cxf.interceptor.OutgoingChainInterceptor.class
.getName());
}

@Override
public void handleMessage(Message message) throws Fault {
System.out.println(ABRInterceptor invoked);
if (ABRSession.isEnabled()) {
executeReRuns(message);
}
}

private void executeReRuns(Message message) {
ListIteratorInterceptor? extends Message iterator =
 prepareReRunIterator(message);
while (ABRSession.isEnabled()) {
ABRSession.disable();
for (int i = 0; i  2; i++) {
iterator.previous();
}
for (int i = 0; i  2; i++) {
Interceptor currentInterceptor =
 iterator.next();
currentInterceptor.handleMessage(message);
}
}
}

private ListIteratorInterceptor? extends Message
 prepareReRunIterator(
Message message) {
ListIteratorInterceptor? extends Message iterator =
 message
.getInterceptorChain().getIterator();
while (iterator.hasNext()) {
iterator.next();
}
iterator.previous();
return iterator;
}

 }



 ==



 Below is the session control device. It uses a thread local variable
 to remember if ABR session is enabled or disabled.
 

Re: ABR Communication Pattern

2008-01-15 Thread Daniel Kulp

Hmm... interesting solution.I like it.  :-)

One potential issue is the use of the ThreadLocal, but that's definitely 
not an issue right now.   The API's technically allow an interceptor 
to pause the chain and then resume it on a separate thread.  (for 
example, to wait for a resource to become available)   Nothing does that 
right now and doing so is going to take a bunch of testing to fix 
everything else that will break.  :-)

To support that, the variable would need to be stored as a property on 
the Exchange.   Then an interceptor would need to be added to the end 
of the outgoing chains to check for the flag and, if present, restart 
the message processing from the beginning.I guess that would be the 
more correct way to do it, but also a LOT more complex.   For now, 
your solution is great.


Dan


On Tuesday 15 January 2008, Mayank Thakore wrote:
 I LOVE OPEN SOURCE

 It worked :)

 just need to call following from service implementation when another
 response needs to be sent (as long as you keep calling this your
 method will get reinvoked to handle this incoming request; so you can
 send as many responses to each request as you want)

   JMSDestination.abrStatus.set(new Boolean(true));



 modified code from JMSDestination:
 --
---

 public static ThreadLocalBoolean abrStatus = new
 ThreadLocalBoolean() { @Override
 protected synchronized Boolean initialValue() {
 return new Boolean(false);
 }
 };

 protected class JMSExecutor implements Runnable {
 javax.jms.Message message;

 JMSExecutor(javax.jms.Message m) {
 message = m;
 }

 public void run() {
 long abrCount = 0;
 do {
 getLogger().log(Level.INFO, run the incoming message
 in the threadpool);
 JMSDestination.abrStatus.set(new Boolean(false));
 getLogger().log(Level.INFO, abr count for
 message(+Thread.currentThread()+): +abrCount);
 abrCount++;
 try {
 incoming(message);
 } catch (IOException ex) {
 // TODO: Decide what to do if we receive the
 exception. getLogger().log(Level.WARNING, Failed to process incoming
 message : , ex);
 break;
 }
 } while(JMSDestination.abrStatus.get().booleanValue());
 }

 }


 --
---

 Regards
 Mayank

 On 1/15/08, Mayank Thakore [EMAIL PROTECTED] wrote:
  Well, it didn't work. The interceptors mutate the message and won't
  process the mutated one.
 
  So I am going to try changing the JMSDestination.incoming code to
  resend the message as many times as required.
 
  Any thoughts?
 
  Regards
  Mayank
 
  -Original Message-
  From: Mayank Thakore [mailto:[EMAIL PROTECTED]
  Sent: Sunday, January 13, 2008 23:09
  To: cxf-user@incubator.apache.org
  Subject: ABR Communication Pattern
 
  Hi,
 
  I was trying to achieve Asynchronous Batch Response (ABR) pattern
  with CXF for JMS transport. ABR means user can send multiple
  responses to a single request.
 
  Please read this and let me know what you think.
 
  Hazard info: I haven't finished the client for this yet, so don't
  know wheather it is working. It does print the cxf logs correctly.
 
  Below is my interceptor code. It attaches to the end of the
  interceptor chain and executes the previous two interceptors which
  invoke the service implementation and send the out message
  respectively.
  ==
 
  package ws.v1.tmf854;
 
  import java.util.ListIterator;
 
  import org.apache.cxf.interceptor.Fault;
  import org.apache.cxf.interceptor.Interceptor;
  import org.apache.cxf.message.Message;
  import org.apache.cxf.phase.AbstractPhaseInterceptor;
  import org.apache.cxf.phase.Phase;
 
  public class ABRInterceptor extends
  AbstractPhaseInterceptorMessage {
 
 public ABRInterceptor() {
 super(Phase.POST_INVOKE);
 getAfter().add(
 
  org.apache.cxf.interceptor.OutgoingChainInterceptor.class
 .getName());
 }
 
 @Override
 public void handleMessage(Message message) throws Fault {
 System.out.println(ABRInterceptor invoked);
 if (ABRSession.isEnabled()) {
 executeReRuns(message);
 }
 }
 
 private void executeReRuns(Message message) {
 ListIteratorInterceptor? extends Message iterator
  = prepareReRunIterator(message);
 while (ABRSession.isEnabled()) {
 ABRSession.disable();
 for (int i = 0; i  2; i++) {
 

RE: ABR Communication Pattern

2008-01-15 Thread Mayank Thakore
Actually that is what I had tried initially. I added an interceptor at the
end and used a flag on the message to decide on reprocessing. But it didn't
work after 2 and half days of effort so I gave in to modifying cxf code.

It would be great if some of the interceptors could undo the change they
did to the message. Then, we would be able to move the message up and down
certain portions of the chain. (It might make the chain heavier or slow down
processing, but we could always add an annotation or configuration to put
undo on/off.)

Combined with the pause/resume functionality, this would open up some
interesting possibilities.

Regards
Mayank

-Original Message-
From: Daniel Kulp [mailto:[EMAIL PROTECTED] 
Sent: Wednesday, January 16, 2008 02:14
To: cxf-user@incubator.apache.org
Cc: Mayank Thakore
Subject: Re: ABR Communication Pattern


Hmm... interesting solution.I like it.  :-)

One potential issue is the use of the ThreadLocal, but that's definitely 
not an issue right now.   The API's technically allow an interceptor 
to pause the chain and then resume it on a separate thread.  (for 
example, to wait for a resource to become available)   Nothing does that 
right now and doing so is going to take a bunch of testing to fix 
everything else that will break.  :-)

To support that, the variable would need to be stored as a property on 
the Exchange.   Then an interceptor would need to be added to the end 
of the outgoing chains to check for the flag and, if present, restart 
the message processing from the beginning.I guess that would be the 
more correct way to do it, but also a LOT more complex.   For now, 
your solution is great.


Dan


On Tuesday 15 January 2008, Mayank Thakore wrote:
 I LOVE OPEN SOURCE

 It worked :)

 just need to call following from service implementation when another
 response needs to be sent (as long as you keep calling this your
 method will get reinvoked to handle this incoming request; so you can
 send as many responses to each request as you want)

   JMSDestination.abrStatus.set(new Boolean(true));



 modified code from JMSDestination:
 --
---

 public static ThreadLocalBoolean abrStatus = new
 ThreadLocalBoolean() { @Override
 protected synchronized Boolean initialValue() {
 return new Boolean(false);
 }
 };

 protected class JMSExecutor implements Runnable {
 javax.jms.Message message;

 JMSExecutor(javax.jms.Message m) {
 message = m;
 }

 public void run() {
 long abrCount = 0;
 do {
 getLogger().log(Level.INFO, run the incoming message
 in the threadpool);
 JMSDestination.abrStatus.set(new Boolean(false));
 getLogger().log(Level.INFO, abr count for
 message(+Thread.currentThread()+): +abrCount);
 abrCount++;
 try {
 incoming(message);
 } catch (IOException ex) {
 // TODO: Decide what to do if we receive the
 exception. getLogger().log(Level.WARNING, Failed to process incoming
 message : , ex);
 break;
 }
 } while(JMSDestination.abrStatus.get().booleanValue());
 }

 }


 --
---

 Regards
 Mayank

 On 1/15/08, Mayank Thakore [EMAIL PROTECTED] wrote:
  Well, it didn't work. The interceptors mutate the message and won't
  process the mutated one.
 
  So I am going to try changing the JMSDestination.incoming code to
  resend the message as many times as required.
 
  Any thoughts?
 
  Regards
  Mayank
 
  -Original Message-
  From: Mayank Thakore [mailto:[EMAIL PROTECTED]
  Sent: Sunday, January 13, 2008 23:09
  To: cxf-user@incubator.apache.org
  Subject: ABR Communication Pattern
 
  Hi,
 
  I was trying to achieve Asynchronous Batch Response (ABR) pattern
  with CXF for JMS transport. ABR means user can send multiple
  responses to a single request.
 
  Please read this and let me know what you think.
 
  Hazard info: I haven't finished the client for this yet, so don't
  know wheather it is working. It does print the cxf logs correctly.
 
  Below is my interceptor code. It attaches to the end of the
  interceptor chain and executes the previous two interceptors which
  invoke the service implementation and send the out message
  respectively.
  ==
 
  package ws.v1.tmf854;
 
  import java.util.ListIterator;
 
  import org.apache.cxf.interceptor.Fault;
  import org.apache.cxf.interceptor.Interceptor;
  import org.apache.cxf.message.Message;
  import org.apache.cxf.phase.AbstractPhaseInterceptor;
  import org.apache.cxf.phase.Phase;
 
  public class ABRInterceptor extends
  AbstractPhaseInterceptorMessage

RE: ABR Communication Pattern

2008-01-14 Thread Mayank Thakore
Well, it didn't work. The interceptors mutate the message and won't process
the mutated one.

So I am going to try changing the JMSDestination.incoming code to resend the
message as many times as required.

Any thoughts?

Regards
Mayank

-Original Message-
From: Mayank Thakore [mailto:[EMAIL PROTECTED] 
Sent: Sunday, January 13, 2008 23:09
To: cxf-user@incubator.apache.org
Subject: ABR Communication Pattern

Hi,

I was trying to achieve Asynchronous Batch Response (ABR) pattern with
CXF for JMS transport. ABR means user can send multiple responses to a
single request.

Please read this and let me know what you think.

Hazard info: I haven't finished the client for this yet, so don't know
wheather it is working. It does print the cxf logs correctly.

Below is my interceptor code. It attaches to the end of the
interceptor chain and executes the previous two interceptors which
invoke the service implementation and send the out message
respectively.
==

package ws.v1.tmf854;

import java.util.ListIterator;

import org.apache.cxf.interceptor.Fault;
import org.apache.cxf.interceptor.Interceptor;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.AbstractPhaseInterceptor;
import org.apache.cxf.phase.Phase;

public class ABRInterceptor extends AbstractPhaseInterceptorMessage {

public ABRInterceptor() {
super(Phase.POST_INVOKE);
getAfter().add(

org.apache.cxf.interceptor.OutgoingChainInterceptor.class
.getName());
}

@Override
public void handleMessage(Message message) throws Fault {
System.out.println(ABRInterceptor invoked);
if (ABRSession.isEnabled()) {
executeReRuns(message);
}
}

private void executeReRuns(Message message) {
ListIteratorInterceptor? extends Message iterator =
prepareReRunIterator(message);
while (ABRSession.isEnabled()) {
ABRSession.disable();
for (int i = 0; i  2; i++) {
iterator.previous();
}
for (int i = 0; i  2; i++) {
Interceptor currentInterceptor =
iterator.next();
currentInterceptor.handleMessage(message);
}
}
}

private ListIteratorInterceptor? extends Message
prepareReRunIterator(
Message message) {
ListIteratorInterceptor? extends Message iterator =
message
.getInterceptorChain().getIterator();
while (iterator.hasNext()) {
iterator.next();
}
iterator.previous();
return iterator;
}

}



==



Below is the session control device. It uses a thread local variable
to remember if ABR session is enabled or disabled.
==

package ws.v1.tmf854;

public class ABRSession {

private static ThreadLocalBoolean status = new
ThreadLocalBoolean() {
@Override
protected synchronized Boolean initialValue() {
return new Boolean(false);
}
};

public static synchronized boolean isEnabled() {
return status.get().booleanValue();
}

public static synchronized void disable() {
System.out.println(Disabling ABRSession);
status.set(new Boolean(false));

}

public static synchronized void enable() {
System.out.println(Enabling ABRSession);
status.set(new Boolean(true));

}

}



==

So, if user wants ABR mode, they just need to invoke
ABRSession.enable() inside the service implementation.

Here is the server main for completion:
==

package ws.v1.tmf854;

import javax.xml.ws.Endpoint;

import org.apache.cxf.jaxws.EndpointImpl;

/**
 * This class was generated by Apache CXF (incubator) 2.0.3-incubator Sat
Jan 12
 * 11:10:39 IST 2008 Generated source version: 2.0.3-incubator
 *
 */

public class AlarmRetrieval_AlarmRetrievalJms_Server {

protected AlarmRetrieval_AlarmRetrievalJms_Server() throws Exception
{
System.out.println(Starting Server);
Object implementor = new AlarmRetrievalImpl();
String address = jms://;
EndpointImpl ep = (EndpointImpl) Endpoint.publish(address,
implementor);
ep.getServer().getEndpoint().getInInterceptors().add(