Hi Norman,
Thx for inputs. comment/confirmation inside.
Eric
On 24/12/11 11:53, Norman Maurer wrote:
Hi Eric,
comments inside....
Am 24.12.2011 um 10:05 schrieb Eric Charles<[email protected]>:
Hi Stephano,
Opening the discussion to learn more :)
- Why are you considering that 2 threads is a criteria to use standard
synchronization rather than some atomic fields.
If you only have a small count of concurrent threads its not slower to use
synchronization as the context switching will not happen often..
Not slower, but not faster.
- I can understand you replace a concurrent by a non-concurrent queue. However,
you now have a blocking queue. Is there an impact due to this blocking aspect?
Nope there is not as we not use the blocking methods. We could even replace it
with a LinkedList.
So why not replace with a LinkedList to make things crystal clear :)
- You defined isAsync as volatile and sometimes encapsulate access to isAsync
in a synchronized block, sometime not. Why using 2 different thread-safety
strategies in this class?
If you only need to access a "status flag" ina concurrent way then its more cheap to just
use a volatile for it. If you need to update more then one field in a "atomic" way you
need synchronized. Updating a volatile in a synchronized is not a problem...
Sure. I will further look at the usage context (the 2 user threads) to
have a better idea.
Thx again Norman,
Eric
Thx,
Eric
Hope it helps,
Norman
On 21/12/11 15:47, [email protected] wrote:
Author: bago
Date: Wed Dec 21 14:47:25 2011
New Revision: 1221748
URL: http://svn.apache.org/viewvc?rev=1221748&view=rev
Log:
An attempt to refactor AbstractProtocolTransport to be thread safe. I moved
back to standard synchronization as we only have max 2 threads competing for
the queue so it doesn't make sense to use a non blocking queue. Norman, please
overview, and feel free to revert if you don't like the solution (i thought it
was better to simply commit instead of opening a JIRA to show you this).
Modified:
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Modified:
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1221748&r1=1221747&r2=1221748&view=diff
==============================================================================
---
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
(original)
+++
james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java
Wed Dec 21 14:47:25 2011
@@ -22,9 +22,8 @@ package org.apache.james.protocols.api;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.james.protocols.api.FutureResponse.ResponseListener;
@@ -42,18 +41,34 @@ public abstract class AbstractProtocolTr
// TODO: Should we limit the size ?
- private final ConcurrentLinkedQueue<Response> responses = new
ConcurrentLinkedQueue<Response>();
- private final AtomicBoolean write = new AtomicBoolean(false);
+ private final Queue<Response> responses = new
LinkedBlockingQueue<Response>();
+ private volatile boolean isAsync = false;
/**
* @see
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
*/
public final void writeResponse(Response response, final ProtocolSession
session) {
- // just add the response to the queue. We will trigger the write
operation later
- responses.add(response);
-
- // trigger the write
- writeQueuedResponses(session);
+ // if we already in asynchrnous mode we simply enqueue the response
+ // we do this synchronously because we may have a dequeuer thread
working on
+ // isAsync and responses.
+ boolean enqueued = false;
+ synchronized(this) {
+ if (isAsync == true) {
+ responses.offer(response);
+ enqueued = true;
+ }
+ }
+
+ // if we didn't enqueue then we check if the response is writable or
we have to
+ // set us "asynchrnous" and wait for response to be ready.
+ if (!enqueued) {
+ if (isResponseWritable(response)) {
+ writeResponseToClient(response, session);
+ } else {
+ addDequeuerListener(response, session);
+ isAsync = true;
+ }
+ }
}
/**
@@ -65,50 +80,46 @@ public abstract class AbstractProtocolTr
* @param session
*/
private void writeQueuedResponses(final ProtocolSession session) {
- Response queuedResponse = null;
- if (write.compareAndSet(false, true)){
- boolean listenerAdded = false;
- // dequeue Responses until non is left
- while ((queuedResponse = responses.poll()) != null) {
-
- // check if we need to take special care of FutureResponses
- if (queuedResponse instanceof FutureResponse) {
- FutureResponse futureResponse =(FutureResponse)
queuedResponse;
- if (futureResponse.isReady()) {
- // future is ready so we can write it without blocking
the IO-Thread
- writeResponseToClient(queuedResponse, session);
- } else {
-
- // future is not ready so we need to write it via a
ResponseListener otherwise we MAY block the IO-Thread
- futureResponse.addListener(new ResponseListener() {
-
- public void onResponse(FutureResponse response) {
- writeResponseToClient(response, session);
- if (write.compareAndSet(true, false)) {
- writeQueuedResponses(session);
- }
- }
- });
- listenerAdded = true;
- // just break here as we will trigger the dequeue later
- break;
- }
-
- } else {
- // the Response is not a FutureResponse, so just write it
back the the remote peer
- writeResponseToClient(queuedResponse, session);
+ // dequeue Responses until non is left
+ while (true) {
+
+ Response queuedResponse = null;
+
+ // synchrnously we check responses and if it is empty we move back
to non asynch
+ // behaviour
+ synchronized(this) {
+ queuedResponse = responses.poll();
+ if (queuedResponse == null) {
+ isAsync = false;
+ break;
}
-
}
- // Check if a ResponseListener was added before. If not we can
allow to write
- // responses again. Otherwise the writing will get triggered from
the listener
- if (listenerAdded == false) {
- write.set(false);
+
+ // if we have something in the queue we continue writing until we
+ // find something asynchronous.
+ if (isResponseWritable(queuedResponse)) {
+ writeResponseToClient(queuedResponse, session);
+ } else {
+ addDequeuerListener(queuedResponse, session);
+ // no changes to isAsync here, because in this method we are
always already async.
+ break;
}
}
-
-
+ }
+
+ private boolean isResponseWritable(Response response) {
+ return !(response instanceof FutureResponse) || ((FutureResponse)
response).isReady();
+ }
+
+ private void addDequeuerListener(Response response, final ProtocolSession
session) {
+ ((FutureResponse) response).addListener(new ResponseListener() {
+
+ public void onResponse(FutureResponse response) {
+ writeResponseToClient(response, session);
+ writeQueuedResponses(session);
+ }
+ });
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
--
Eric http://about.echarles.net
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
--
Eric http://about.echarles.net
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]